source: trunk/libtransmission/peer-mgr.c @ 12012

Last change on this file since 12012 was 12012, checked in by jordan, 11 years ago

(trunk libT) #4048 "use bitsets instead of bitfield in tr_completion" -- done.

Excuse the sprawl. Much of this didn't fit into self-contained commits.

  • Property svn:keywords set to Date Rev Author Id
File size: 113.0 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 12012 2011-02-23 03:54:04Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "session.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to cull old atoms */
46    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
47
48    /* how frequently to change which peers are choked */
49    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
50
51    /* an optimistically unchoked peer is immune from rechoking
52       for this many calls to rechokeUploads(). */
53    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
54
55    /* how frequently to reallocate bandwidth */
56    BANDWIDTH_PERIOD_MSEC = 500,
57
58    /* how frequently to age out old piece request lists */
59    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
60
61    /* how frequently to decide which peers live and die */
62    RECONNECT_PERIOD_MSEC = 500,
63
64    /* when many peers are available, keep idle ones this long */
65    MIN_UPLOAD_IDLE_SECS = ( 60 ),
66
67    /* when few peers are available, keep idle ones this long */
68    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
69
70    /* max number of peers to ask for per second overall.
71     * this throttle is to avoid overloading the router */
72    MAX_CONNECTIONS_PER_SECOND = 12,
73
74    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
75
76    /* number of bad pieces a peer is allowed to send before we ban them */
77    MAX_BAD_PIECES_PER_PEER = 5,
78
79    /* amount of time to keep a list of request pieces lying around
80       before it's considered too old and needs to be rebuilt */
81    PIECE_LIST_SHELF_LIFE_SECS = 60,
82
83    /* use for bitwise operations w/peer_atom.flags2 */
84    MYFLAG_BANNED = 1,
85
86    /* use for bitwise operations w/peer_atom.flags2 */
87    /* unreachable for now... but not banned.
88     * if they try to connect to us it's okay */
89    MYFLAG_UNREACHABLE = 2,
90
91    /* the minimum we'll wait before attempting to reconnect to a peer */
92    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
93
94    /** how long we'll let requests we've made linger before we cancel them */
95    REQUEST_TTL_SECS = 120,
96
97    NO_BLOCKS_CANCEL_HISTORY = 120,
98
99    CANCEL_HISTORY_SEC = 60
100};
101
102const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0.0f, 0, FALSE, 0 };
103
104/**
105***
106**/
107
108enum
109{
110    UPLOAD_ONLY_UKNOWN,
111    UPLOAD_ONLY_YES,
112    UPLOAD_ONLY_NO
113};
114
115/**
116 * Peer information that should be kept even before we've connected and
117 * after we've disconnected. These are kept in a pool of peer_atoms to decide
118 * which ones would make good candidates for connecting to, and to watch out
119 * for banned peers.
120 *
121 * @see tr_peer
122 * @see tr_peermsgs
123 */
124struct peer_atom
125{
126    uint8_t     fromFirst;          /* where the peer was first found */
127    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
128    uint8_t     flags;              /* these match the added_f flags */
129    uint8_t     flags2;             /* flags that aren't defined in added_f */
130    uint8_t     uploadOnly;         /* UPLOAD_ONLY_ */
131    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
132    int8_t      blocklisted;        /* -1 for unknown, TRUE for blocklisted, FALSE for not blocklisted */
133
134    tr_port     port;
135    tr_bool     utp_failed;         /* We recently failed to connect over uTP */
136    uint16_t    numFails;
137    time_t      time;               /* when the peer's connection status last changed */
138    time_t      piece_data_time;
139
140    time_t      lastConnectionAttemptAt;
141    time_t      lastConnectionAt;
142
143    /* similar to a TTL field, but less rigid --
144     * if the swarm is small, the atom will be kept past this date. */
145    time_t      shelf_date;
146    tr_peer   * peer;               /* will be NULL if not connected */
147    tr_address  addr;
148};
149
150#ifdef NDEBUG
151#define tr_isAtom(a) (TRUE)
152#else
153static tr_bool
154tr_isAtom( const struct peer_atom * atom )
155{
156    return ( atom != NULL )
157        && ( atom->fromFirst < TR_PEER_FROM__MAX )
158        && ( atom->fromBest < TR_PEER_FROM__MAX )
159        && ( tr_isAddress( &atom->addr ) );
160}
161#endif
162
163static const char*
164tr_atomAddrStr( const struct peer_atom * atom )
165{
166    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
167}
168
169struct block_request
170{
171    tr_block_index_t block;
172    tr_peer * peer;
173    time_t sentAt;
174};
175
176struct weighted_piece
177{
178    tr_piece_index_t index;
179    int16_t salt;
180    int16_t requestCount;
181};
182
183enum piece_sort_state
184{
185    PIECES_UNSORTED,
186    PIECES_SORTED_BY_INDEX,
187    PIECES_SORTED_BY_WEIGHT
188};
189
190/** @brief Opaque, per-torrent data structure for peer connection information */
191typedef struct tr_torrent_peers
192{
193    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
194    tr_ptrArray                pool; /* struct peer_atom */
195    tr_ptrArray                peers; /* tr_peer */
196    tr_ptrArray                webseeds; /* tr_webseed */
197
198    tr_torrent               * tor;
199    struct tr_peerMgr        * manager;
200
201    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
202    int                        optimisticUnchokeTimeScaler;
203
204    tr_bool                    isRunning;
205    tr_bool                    needsCompletenessCheck;
206
207    struct block_request     * requests;
208    int                        requestCount;
209    int                        requestAlloc;
210
211    struct weighted_piece    * pieces;
212    int                        pieceCount;
213    enum piece_sort_state      pieceSortState;
214
215    /* An array of pieceCount items stating how many peers have each piece.
216       This is used to help us for downloading pieces "rarest first."
217       This may be NULL if we don't have metainfo yet, or if we're not
218       downloading and don't care about rarity */
219    uint16_t                 * pieceReplication;
220    size_t                     pieceReplicationSize;
221
222    int                        interestedCount;
223    int                        maxPeers;
224    time_t                     lastCancel;
225
226    /* Before the endgame this should be 0. In endgame, is contains the average
227     * number of pending requests per peer. Only peers which have more pending
228     * requests are considered 'fast' are allowed to request a block that's
229     * already been requested from another (slower?) peer. */
230    int                        endgame;
231}
232Torrent;
233
234struct tr_peerMgr
235{
236    tr_session    * session;
237    tr_ptrArray     incomingHandshakes; /* tr_handshake */
238    struct event  * bandwidthTimer;
239    struct event  * rechokeTimer;
240    struct event  * refillUpkeepTimer;
241    struct event  * atomTimer;
242};
243
244#define tordbg( t, ... ) \
245    do { \
246        if( tr_deepLoggingIsActive( ) ) \
247            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
248    } while( 0 )
249
250#define dbgmsg( ... ) \
251    do { \
252        if( tr_deepLoggingIsActive( ) ) \
253            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
254    } while( 0 )
255
256/**
257***
258**/
259
260static inline void
261managerLock( const struct tr_peerMgr * manager )
262{
263    tr_sessionLock( manager->session );
264}
265
266static inline void
267managerUnlock( const struct tr_peerMgr * manager )
268{
269    tr_sessionUnlock( manager->session );
270}
271
272static inline void
273torrentLock( Torrent * torrent )
274{
275    managerLock( torrent->manager );
276}
277
278static inline void
279torrentUnlock( Torrent * torrent )
280{
281    managerUnlock( torrent->manager );
282}
283
284static inline int
285torrentIsLocked( const Torrent * t )
286{
287    return tr_sessionIsLocked( t->manager->session );
288}
289
290/**
291***
292**/
293
294static int
295handshakeCompareToAddr( const void * va, const void * vb )
296{
297    const tr_handshake * a = va;
298
299    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
300}
301
302static int
303handshakeCompare( const void * a, const void * b )
304{
305    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
306}
307
308static inline tr_handshake*
309getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
310{
311    if( tr_ptrArrayEmpty( handshakes ) )
312        return NULL;
313
314    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
315}
316
317static int
318comparePeerAtomToAddress( const void * va, const void * vb )
319{
320    const struct peer_atom * a = va;
321
322    return tr_compareAddresses( &a->addr, vb );
323}
324
325static int
326compareAtomsByAddress( const void * va, const void * vb )
327{
328    const struct peer_atom * b = vb;
329
330    assert( tr_isAtom( b ) );
331
332    return comparePeerAtomToAddress( va, &b->addr );
333}
334
335/**
336***
337**/
338
339const tr_address *
340tr_peerAddress( const tr_peer * peer )
341{
342    return &peer->atom->addr;
343}
344
345static Torrent*
346getExistingTorrent( tr_peerMgr *    manager,
347                    const uint8_t * hash )
348{
349    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
350
351    return tor == NULL ? NULL : tor->torrentPeers;
352}
353
354static int
355peerCompare( const void * a, const void * b )
356{
357    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
358}
359
360static struct peer_atom*
361getExistingAtom( const Torrent    * t,
362                 const tr_address * addr )
363{
364    Torrent * tt = (Torrent*)t;
365    assert( torrentIsLocked( t ) );
366    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
367}
368
369static tr_bool
370peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
371{
372    Torrent * t = (Torrent*) ct;
373
374    assert( torrentIsLocked ( t ) );
375
376    return ( atom->peer != NULL )
377        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
378        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
379}
380
381static tr_peer*
382peerConstructor( struct peer_atom * atom )
383{
384    tr_peer * peer = tr_new0( tr_peer, 1 );
385
386    peer->have = TR_BITSET_INIT;
387
388    peer->atom = atom;
389    atom->peer = peer;
390
391    peer->blocksSentToClient  = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
392    peer->blocksSentToPeer    = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
393    peer->cancelsSentToClient = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
394    peer->cancelsSentToPeer   = tr_historyNew( CANCEL_HISTORY_SEC, ( REFILL_UPKEEP_PERIOD_MSEC / 1000 ) );
395
396    return peer;
397}
398
399static tr_peer*
400getPeer( Torrent * torrent, struct peer_atom * atom )
401{
402    tr_peer * peer;
403
404    assert( torrentIsLocked( torrent ) );
405
406    peer = atom->peer;
407
408    if( peer == NULL )
409    {
410        peer = peerConstructor( atom );
411        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
412    }
413
414    return peer;
415}
416
417static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
418
419static void
420peerDestructor( Torrent * t, tr_peer * peer )
421{
422    assert( peer != NULL );
423
424    peerDeclinedAllRequests( t, peer );
425
426    if( peer->msgs != NULL )
427        tr_peerMsgsFree( peer->msgs );
428
429    tr_peerIoClear( peer->io );
430    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
431
432    tr_historyFree( peer->blocksSentToClient  );
433    tr_historyFree( peer->blocksSentToPeer    );
434    tr_historyFree( peer->cancelsSentToClient );
435    tr_historyFree( peer->cancelsSentToPeer   );
436
437    tr_bitsetDestruct( &peer->have );
438    tr_bitfieldFree( peer->blame );
439    tr_free( peer->client );
440    peer->atom->peer = NULL;
441
442    tr_free( peer );
443}
444
445static tr_bool
446replicationExists( const Torrent * t )
447{
448    return t->pieceReplication != NULL;
449}
450
451static void
452replicationFree( Torrent * t )
453{
454    tr_free( t->pieceReplication );
455    t->pieceReplication = NULL;
456    t->pieceReplicationSize = 0;
457}
458
459static void
460replicationNew( Torrent * t )
461{
462    tr_piece_index_t piece_i;
463    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
464    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
465    const int peer_count = tr_ptrArraySize( &t->peers );
466
467    assert( !replicationExists( t ) );
468
469    t->pieceReplicationSize = piece_count;
470    t->pieceReplication = tr_new0( uint16_t, piece_count );
471
472    for( piece_i=0; piece_i<piece_count; ++piece_i )
473    {
474        int peer_i;
475        uint16_t r = 0;
476
477        for( peer_i=0; peer_i<peer_count; ++peer_i )
478            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
479                ++r;
480
481        t->pieceReplication[piece_i] = r;
482    }
483}
484
485static void
486torrentDestructor( void * vt )
487{
488    Torrent * t = vt;
489
490    assert( t );
491    assert( !t->isRunning );
492    assert( torrentIsLocked( t ) );
493    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
494    assert( tr_ptrArrayEmpty( &t->peers ) );
495
496    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
497    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
498    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
499    tr_ptrArrayDestruct( &t->peers, NULL );
500
501    replicationFree( t );
502
503    tr_free( t->requests );
504    tr_free( t->pieces );
505    tr_free( t );
506}
507
508static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
509
510static Torrent*
511torrentConstructor( tr_peerMgr * manager,
512                    tr_torrent * tor )
513{
514    int       i;
515    Torrent * t;
516
517    t = tr_new0( Torrent, 1 );
518    t->manager = manager;
519    t->tor = tor;
520    t->pool = TR_PTR_ARRAY_INIT;
521    t->peers = TR_PTR_ARRAY_INIT;
522    t->webseeds = TR_PTR_ARRAY_INIT;
523    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
524
525    for( i = 0; i < tor->info.webseedCount; ++i )
526    {
527        tr_webseed * w =
528            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
529        tr_ptrArrayAppend( &t->webseeds, w );
530    }
531
532    return t;
533}
534
535tr_peerMgr*
536tr_peerMgrNew( tr_session * session )
537{
538    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
539    m->session = session;
540    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
541    return m;
542}
543
544static void
545deleteTimer( struct event ** t )
546{
547    if( *t != NULL )
548    {
549        event_free( *t );
550        *t = NULL;
551    }
552}
553
554static void
555deleteTimers( struct tr_peerMgr * m )
556{
557    deleteTimer( &m->atomTimer );
558    deleteTimer( &m->bandwidthTimer );
559    deleteTimer( &m->rechokeTimer );
560    deleteTimer( &m->refillUpkeepTimer );
561}
562
563void
564tr_peerMgrFree( tr_peerMgr * manager )
565{
566    managerLock( manager );
567
568    deleteTimers( manager );
569
570    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
571     * the item from manager->handshakes, so this is a little roundabout... */
572    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
573        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
574
575    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
576
577    managerUnlock( manager );
578    tr_free( manager );
579}
580
581static int
582clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
583{
584    if( !tr_torrentHasMetadata( tor ) )
585        return TRUE;
586
587    return peer->clientIsInterested && !peer->clientIsChoked;
588}
589
590static int
591clientIsUploadingTo( const tr_peer * peer )
592{
593    return peer->peerIsInterested && !peer->peerIsChoked;
594}
595
596/***
597****
598***/
599
600void
601tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
602{
603    tr_torrent * tor = NULL;
604    tr_session * session = mgr->session;
605
606    /* we cache whether or not a peer is blocklisted...
607       since the blocklist has changed, erase that cached value */
608    while(( tor = tr_torrentNext( session, tor )))
609    {
610        int i;
611        Torrent * t = tor->torrentPeers;
612        const int n = tr_ptrArraySize( &t->pool );
613        for( i=0; i<n; ++i ) {
614            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
615            atom->blocklisted = -1;
616        }
617    }
618}
619
620static tr_bool
621isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
622{
623    if( atom->blocklisted < 0 )
624        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
625
626    assert( tr_isBool( atom->blocklisted ) );
627    return atom->blocklisted;
628}
629
630
631/***
632****
633***/
634
635static void
636atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
637{
638    assert( atom != NULL );
639    assert( -1<=seedProbability && seedProbability<=100 );
640
641    atom->seedProbability = seedProbability;
642
643    if( seedProbability == 100 )
644        atom->flags |= ADDED_F_SEED_FLAG;
645    else if( seedProbability != -1 )
646        atom->flags &= ~ADDED_F_SEED_FLAG;
647}
648
649static void
650atomSetSeed( struct peer_atom * atom )
651{
652    atomSetSeedProbability( atom, 100 );
653}
654
655static inline tr_bool
656atomIsSeed( const struct peer_atom * atom )
657{
658    return atom->seedProbability == 100;
659}
660
661tr_bool
662tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
663                      const tr_address  * addr )
664{
665    tr_bool isSeed = FALSE;
666    const Torrent * t = tor->torrentPeers;
667    const struct peer_atom * atom = getExistingAtom( t, addr );
668
669    if( atom )
670        isSeed = atomIsSeed( atom );
671
672    return isSeed;
673}
674
675void
676tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
677{
678    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
679
680    if( atom )
681        atom->flags |= ADDED_F_UTP_FLAGS;
682}
683
684void
685tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, tr_bool failed )
686{
687    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
688
689    if( atom )
690        atom->utp_failed = failed;
691}
692
693
694/**
695***  REQUESTS
696***
697*** There are two data structures associated with managing block requests:
698***
699*** 1. Torrent::requests, an array of "struct block_request" which keeps
700***    track of which blocks have been requested, and when, and by which peers.
701***    This is list is used for (a) cancelling requests that have been pending
702***    for too long and (b) avoiding duplicate requests before endgame.
703***
704*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
705***    pieces that we want to request. It's used to decide which blocks to
706***    return next when tr_peerMgrGetBlockRequests() is called.
707**/
708
709/**
710*** struct block_request
711**/
712
713static int
714compareReqByBlock( const void * va, const void * vb )
715{
716    const struct block_request * a = va;
717    const struct block_request * b = vb;
718
719    /* primary key: block */
720    if( a->block < b->block ) return -1;
721    if( a->block > b->block ) return 1;
722
723    /* secondary key: peer */
724    if( a->peer < b->peer ) return -1;
725    if( a->peer > b->peer ) return 1;
726
727    return 0;
728}
729
730static void
731requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
732{
733    struct block_request key;
734
735    /* ensure enough room is available... */
736    if( t->requestCount + 1 >= t->requestAlloc )
737    {
738        const int CHUNK_SIZE = 128;
739        t->requestAlloc += CHUNK_SIZE;
740        t->requests = tr_renew( struct block_request,
741                                t->requests, t->requestAlloc );
742    }
743
744    /* populate the record we're inserting */
745    key.block = block;
746    key.peer = peer;
747    key.sentAt = tr_time( );
748
749    /* insert the request to our array... */
750    {
751        tr_bool exact;
752        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
753                                       sizeof( struct block_request ),
754                                       compareReqByBlock, &exact );
755        assert( !exact );
756        memmove( t->requests + pos + 1,
757                 t->requests + pos,
758                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
759        t->requests[pos] = key;
760    }
761
762    if( peer != NULL )
763    {
764        ++peer->pendingReqsToPeer;
765        assert( peer->pendingReqsToPeer >= 0 );
766    }
767
768    /*fprintf( stderr, "added request of block %lu from peer %s... "
769                       "there are now %d block\n",
770                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
771}
772
773static struct block_request *
774requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
775{
776    struct block_request key;
777    key.block = block;
778    key.peer = (tr_peer*) peer;
779
780    return bsearch( &key, t->requests, t->requestCount,
781                    sizeof( struct block_request ),
782                    compareReqByBlock );
783}
784
785/**
786 * Find the peers are we currently requesting the block
787 * with index @a block from and append them to @a peerArr.
788 */
789static void
790getBlockRequestPeers( Torrent * t, tr_block_index_t block,
791                      tr_ptrArray * peerArr )
792{
793    tr_bool exact;
794    int i, pos;
795    struct block_request key;
796
797    key.block = block;
798    key.peer = NULL;
799    pos = tr_lowerBound( &key, t->requests, t->requestCount,
800                         sizeof( struct block_request ),
801                         compareReqByBlock, &exact );
802
803    assert( !exact ); /* shouldn't have a request with .peer == NULL */
804
805    for( i = pos; i < t->requestCount; ++i )
806    {
807        if( t->requests[i].block != block )
808            break;
809        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
810    }
811}
812
813static void
814decrementPendingReqCount( const struct block_request * b )
815{
816    if( b->peer != NULL )
817        if( b->peer->pendingReqsToPeer > 0 )
818            --b->peer->pendingReqsToPeer;
819}
820
821static void
822requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
823{
824    const struct block_request * b = requestListLookup( t, block, peer );
825    if( b != NULL )
826    {
827        const int pos = b - t->requests;
828        assert( pos < t->requestCount );
829
830        decrementPendingReqCount( b );
831
832        tr_removeElementFromArray( t->requests,
833                                   pos,
834                                   sizeof( struct block_request ),
835                                   t->requestCount-- );
836
837        /*fprintf( stderr, "removing request of block %lu from peer %s... "
838                           "there are now %d block requests left\n",
839                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
840    }
841}
842
843static int
844countActiveWebseeds( const Torrent * t )
845{
846    int activeCount = 0;
847    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
848    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
849
850    for( ; w!=wend; ++w )
851        if( tr_webseedIsActive( *w ) )
852            ++activeCount;
853
854    return activeCount;
855}
856
857static void
858updateEndgame( Torrent * t )
859{
860    const tr_torrent * tor = t->tor;
861    const tr_block_index_t missing = tr_cpBlocksMissing( &tor->completion );
862
863    assert( t->requestCount >= 0 );
864
865    if( (tr_block_index_t) t->requestCount < missing )
866    {
867        /* not in endgame */
868        t->endgame = 0;
869    }
870    else if( !t->endgame ) /* only recalculate when endgame first begins */
871    {
872        int numDownloading = 0;
873        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
874        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
875
876        /* add the active bittorrent peers... */
877        for( ; p!=pend; ++p )
878            if( (*p)->pendingReqsToPeer > 0 )
879                ++numDownloading;
880
881        /* add the active webseeds... */
882        numDownloading += countActiveWebseeds( t );
883
884        /* average number of pending requests per downloading peer */
885        t->endgame = t->requestCount / MAX( numDownloading, 1 );
886    }
887}
888
889
890/****
891*****
892*****  Piece List Manipulation / Accessors
893*****
894****/
895
896static inline void
897invalidatePieceSorting( Torrent * t )
898{
899    t->pieceSortState = PIECES_UNSORTED;
900}
901
902const tr_torrent * weightTorrent;
903
904const uint16_t * weightReplication;
905
906static void
907setComparePieceByWeightTorrent( Torrent * t )
908{
909    if( !replicationExists( t ) )
910        replicationNew( t );
911
912    weightTorrent = t->tor;
913    weightReplication = t->pieceReplication;
914}
915
916/* we try to create a "weight" s.t. high-priority pieces come before others,
917 * and that partially-complete pieces come before empty ones. */
918static int
919comparePieceByWeight( const void * va, const void * vb )
920{
921    const struct weighted_piece * a = va;
922    const struct weighted_piece * b = vb;
923    int ia, ib, missing, pending;
924    const tr_torrent * tor = weightTorrent;
925    const uint16_t * rep = weightReplication;
926
927    /* primary key: weight */
928    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
929    pending = a->requestCount;
930    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
931    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
932    pending = b->requestCount;
933    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
934    if( ia < ib ) return -1;
935    if( ia > ib ) return 1;
936
937    /* secondary key: higher priorities go first */
938    ia = tor->info.pieces[a->index].priority;
939    ib = tor->info.pieces[b->index].priority;
940    if( ia > ib ) return -1;
941    if( ia < ib ) return 1;
942
943    /* tertiary key: rarest first. */
944    ia = rep[a->index];
945    ib = rep[b->index];
946    if( ia < ib ) return -1;
947    if( ia > ib ) return 1;
948
949    /* quaternary key: random */
950    if( a->salt < b->salt ) return -1;
951    if( a->salt > b->salt ) return 1;
952
953    /* okay, they're equal */
954    return 0;
955}
956
957static int
958comparePieceByIndex( const void * va, const void * vb )
959{
960    const struct weighted_piece * a = va;
961    const struct weighted_piece * b = vb;
962    if( a->index < b->index ) return -1;
963    if( a->index > b->index ) return 1;
964    return 0;
965}
966
967static void
968pieceListSort( Torrent * t, enum piece_sort_state state )
969{
970    assert( state==PIECES_SORTED_BY_INDEX
971         || state==PIECES_SORTED_BY_WEIGHT );
972
973
974    if( state == PIECES_SORTED_BY_WEIGHT )
975    {
976        setComparePieceByWeightTorrent( t );
977        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
978    }
979    else
980        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
981
982    t->pieceSortState = state;
983}
984
985/**
986 * These functions are useful for testing, but too expensive for nightly builds.
987 * let's leave it disabled but add an easy hook to compile it back in
988 */
989#if 1
990#define assertWeightedPiecesAreSorted(t)
991#define assertReplicationCountIsExact(t)
992#else
993static void
994assertWeightedPiecesAreSorted( Torrent * t )
995{
996    if( !t->endgame )
997    {
998        int i;
999        setComparePieceByWeightTorrent( t );
1000        for( i=0; i<t->pieceCount-1; ++i )
1001            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1002    }
1003}
1004static void
1005assertReplicationCountIsExact( Torrent * t )
1006{
1007    /* This assert might fail due to errors of implementations in other
1008     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1009     * from a client. If a such a behavior is noticed,
1010     * a bug report should be filled to the faulty client. */
1011
1012    size_t piece_i;
1013    const uint16_t * rep = t->pieceReplication;
1014    const size_t piece_count = t->pieceReplicationSize;
1015    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1016    const int peer_count = tr_ptrArraySize( &t->peers );
1017
1018    assert( piece_count == t->tor->info.pieceCount );
1019
1020    for( piece_i=0; piece_i<piece_count; ++piece_i )
1021    {
1022        int peer_i;
1023        uint16_t r = 0;
1024
1025        for( peer_i=0; peer_i<peer_count; ++peer_i )
1026            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
1027                ++r;
1028
1029        assert( rep[piece_i] == r );
1030    }
1031}
1032#endif
1033
1034static struct weighted_piece *
1035pieceListLookup( Torrent * t, tr_piece_index_t index )
1036{
1037    int i;
1038
1039    for( i=0; i<t->pieceCount; ++i )
1040        if( t->pieces[i].index == index )
1041            return &t->pieces[i];
1042
1043    return NULL;
1044}
1045
1046static void
1047pieceListRebuild( Torrent * t )
1048{
1049
1050    if( !tr_torrentIsSeed( t->tor ) )
1051    {
1052        tr_piece_index_t i;
1053        tr_piece_index_t * pool;
1054        tr_piece_index_t poolCount = 0;
1055        const tr_torrent * tor = t->tor;
1056        const tr_info * inf = tr_torrentInfo( tor );
1057        struct weighted_piece * pieces;
1058        int pieceCount;
1059
1060        /* build the new list */
1061        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1062        for( i=0; i<inf->pieceCount; ++i )
1063            if( !inf->pieces[i].dnd )
1064                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1065                    pool[poolCount++] = i;
1066        pieceCount = poolCount;
1067        pieces = tr_new0( struct weighted_piece, pieceCount );
1068        for( i=0; i<poolCount; ++i ) {
1069            struct weighted_piece * piece = pieces + i;
1070            piece->index = pool[i];
1071            piece->requestCount = 0;
1072            piece->salt = tr_cryptoWeakRandInt( 4096 );
1073        }
1074
1075        /* if we already had a list of pieces, merge it into
1076         * the new list so we don't lose its requestCounts */
1077        if( t->pieces != NULL )
1078        {
1079            struct weighted_piece * o = t->pieces;
1080            struct weighted_piece * oend = o + t->pieceCount;
1081            struct weighted_piece * n = pieces;
1082            struct weighted_piece * nend = n + pieceCount;
1083
1084            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1085
1086            while( o!=oend && n!=nend ) {
1087                if( o->index < n->index )
1088                    ++o;
1089                else if( o->index > n->index )
1090                    ++n;
1091                else
1092                    *n++ = *o++;
1093            }
1094
1095            tr_free( t->pieces );
1096        }
1097
1098        t->pieces = pieces;
1099        t->pieceCount = pieceCount;
1100
1101        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1102
1103        /* cleanup */
1104        tr_free( pool );
1105    }
1106}
1107
1108static void
1109pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1110{
1111    struct weighted_piece * p;
1112
1113    if(( p = pieceListLookup( t, piece )))
1114    {
1115        const int pos = p - t->pieces;
1116
1117        tr_removeElementFromArray( t->pieces,
1118                                   pos,
1119                                   sizeof( struct weighted_piece ),
1120                                   t->pieceCount-- );
1121
1122        if( t->pieceCount == 0 )
1123        {
1124            tr_free( t->pieces );
1125            t->pieces = NULL;
1126        }
1127    }
1128}
1129
1130static void
1131pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1132{
1133    int pos;
1134    tr_bool isSorted = TRUE;
1135
1136    if( p == NULL )
1137        return;
1138
1139    /* is the torrent already sorted? */
1140    pos = p - t->pieces;
1141    setComparePieceByWeightTorrent( t );
1142    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1143        isSorted = FALSE;
1144    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1145        isSorted = FALSE;
1146
1147    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1148    {
1149       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1150       isSorted = TRUE;
1151    }
1152
1153    /* if it's not sorted, move it around */
1154    if( !isSorted )
1155    {
1156        tr_bool exact;
1157        const struct weighted_piece tmp = *p;
1158
1159        tr_removeElementFromArray( t->pieces,
1160                                   pos,
1161                                   sizeof( struct weighted_piece ),
1162                                   t->pieceCount-- );
1163
1164        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1165                             sizeof( struct weighted_piece ),
1166                             comparePieceByWeight, &exact );
1167
1168        memmove( &t->pieces[pos + 1],
1169                 &t->pieces[pos],
1170                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1171
1172        t->pieces[pos] = tmp;
1173    }
1174
1175    assertWeightedPiecesAreSorted( t );
1176}
1177
1178static void
1179pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1180{
1181    struct weighted_piece * p;
1182    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1183
1184    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1185    {
1186        --p->requestCount;
1187        pieceListResortPiece( t, p );
1188    }
1189}
1190
1191
1192/****
1193*****
1194*****  Replication count ( for rarest first policy )
1195*****
1196****/
1197
1198/**
1199 * Increase the replication count of this piece and sort it if the
1200 * piece list is already sorted
1201 */
1202static void
1203tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1204{
1205    assert( replicationExists( t ) );
1206    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1207
1208    /* One more replication of this piece is present in the swarm */
1209    ++t->pieceReplication[index];
1210
1211    /* we only resort the piece if the list is already sorted */
1212    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1213        pieceListResortPiece( t, pieceListLookup( t, index ) );
1214}
1215
1216/**
1217 * Increases the replication count of pieces present in the bitfield
1218 */
1219static void
1220tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1221{
1222    size_t i;
1223    uint16_t * rep = t->pieceReplication;
1224    const size_t n = t->tor->info.pieceCount;
1225
1226    assert( replicationExists( t ) );
1227    assert( n == t->pieceReplicationSize );
1228    assert( tr_bitfieldTestFast( b, n-1 ) );
1229
1230    for( i=0; i<n; ++i )
1231        if( tr_bitfieldHas( b, i ) )
1232            ++rep[i];
1233
1234    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1235        invalidatePieceSorting( t );
1236}
1237
1238/**
1239 * Increase the replication count of every piece
1240 */
1241static void
1242tr_incrReplication( Torrent * t )
1243{
1244    int i;
1245    const int n = t->pieceReplicationSize;
1246
1247    assert( replicationExists( t ) );
1248    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1249
1250    for( i=0; i<n; ++i )
1251        ++t->pieceReplication[i];
1252}
1253
1254/**
1255 * Decrease the replication count of pieces present in the bitset.
1256 */
1257static void
1258tr_decrReplicationFromBitset( Torrent * t, const tr_bitset * bitset )
1259{
1260    int i;
1261    const int n = t->pieceReplicationSize;
1262
1263    assert( replicationExists( t ) );
1264    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1265
1266    if( bitset->haveAll )
1267    {
1268        for( i=0; i<n; ++i )
1269            --t->pieceReplication[i];
1270    }
1271    else if ( !bitset->haveNone )
1272    {
1273        const tr_bitfield * const b = &bitset->bitfield;
1274
1275        for( i=0; i<n; ++i )
1276            if( tr_bitfieldHas( b, i ) )
1277                --t->pieceReplication[i];
1278
1279        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1280            invalidatePieceSorting( t );
1281    }
1282}
1283
1284/**
1285***
1286**/
1287
1288void
1289tr_peerMgrRebuildRequests( tr_torrent * tor )
1290{
1291    assert( tr_isTorrent( tor ) );
1292
1293    pieceListRebuild( tor->torrentPeers );
1294}
1295
1296void
1297tr_peerMgrGetNextRequests( tr_torrent           * tor,
1298                           tr_peer              * peer,
1299                           int                    numwant,
1300                           tr_block_index_t     * setme,
1301                           int                  * numgot )
1302{
1303    int i;
1304    int got;
1305    Torrent * t;
1306    struct weighted_piece * pieces;
1307    const tr_bitset * have = &peer->have;
1308
1309    /* sanity clause */
1310    assert( tr_isTorrent( tor ) );
1311    assert( peer->clientIsInterested );
1312    assert( !peer->clientIsChoked );
1313    assert( numwant > 0 );
1314
1315    /* walk through the pieces and find blocks that should be requested */
1316    got = 0;
1317    t = tor->torrentPeers;
1318
1319    /* prep the pieces list */
1320    if( t->pieces == NULL )
1321        pieceListRebuild( t );
1322
1323    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1324        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1325
1326    assertReplicationCountIsExact( t );
1327    assertWeightedPiecesAreSorted( t );
1328
1329    updateEndgame( t );
1330    pieces = t->pieces;
1331    for( i=0; i<t->pieceCount && got<numwant; ++i )
1332    {
1333        struct weighted_piece * p = pieces + i;
1334
1335        /* if the peer has this piece that we want... */
1336        if( tr_bitsetHas( have, p->index ) )
1337        {
1338            tr_block_index_t b;
1339            tr_block_index_t first;
1340            tr_block_index_t last;
1341            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1342
1343            tr_torGetPieceBlockRange( tor, p->index, &first, &last );
1344
1345            for( b=first; b<=last && got<numwant; ++b )
1346            {
1347                int peerCount;
1348                tr_peer ** peers;
1349
1350                /* don't request blocks we've already got */
1351                if( tr_cpBlockIsComplete( &tor->completion, b ) )
1352                    continue;
1353
1354                /* always add peer if this block has no peers yet */
1355                tr_ptrArrayClear( &peerArr );
1356                getBlockRequestPeers( t, b, &peerArr );
1357                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1358                if( peerCount != 0 )
1359                {
1360                    /* don't make a second block request until the endgame */
1361                    if( !t->endgame )
1362                        continue;
1363
1364                    /* don't have more than two peers requesting this block */
1365                    if( peerCount > 1 )
1366                        continue;
1367
1368                    /* don't send the same request to the same peer twice */
1369                    if( peer == peers[0] )
1370                        continue;
1371
1372                    /* in the endgame allow an additional peer to download a
1373                       block but only if the peer seems to be handling requests
1374                       relatively fast */
1375                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1376                        continue;
1377                }
1378
1379                /* update the caller's table */
1380                setme[got++] = b;
1381
1382                /* update our own tables */
1383                requestListAdd( t, b, peer );
1384                ++p->requestCount;
1385            }
1386
1387            tr_ptrArrayDestruct( &peerArr, NULL );
1388        }
1389    }
1390
1391    /* In most cases we've just changed the weights of a small number of pieces.
1392     * So rather than qsort()ing the entire array, it's faster to apply an
1393     * adaptive insertion sort algorithm. */
1394    if( got > 0 )
1395    {
1396        /* not enough requests || last piece modified */
1397        if ( i == t->pieceCount ) --i;
1398
1399        setComparePieceByWeightTorrent( t );
1400        while( --i >= 0 )
1401        {
1402            tr_bool exact;
1403
1404            /* relative position! */
1405            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1406                                              t->pieceCount - (i + 1),
1407                                              sizeof( struct weighted_piece ),
1408                                              comparePieceByWeight, &exact );
1409            if( newpos > 0 )
1410            {
1411                const struct weighted_piece piece = t->pieces[i];
1412                memmove( &t->pieces[i],
1413                         &t->pieces[i + 1],
1414                         sizeof( struct weighted_piece ) * ( newpos ) );
1415                t->pieces[i + newpos] = piece;
1416            }
1417        }
1418    }
1419
1420    assertWeightedPiecesAreSorted( t );
1421    *numgot = got;
1422}
1423
1424tr_bool
1425tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1426                          const tr_peer     * peer,
1427                          tr_block_index_t    block )
1428{
1429    const Torrent * t = tor->torrentPeers;
1430    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1431}
1432
1433/* cancel requests that are too old */
1434static void
1435refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1436{
1437    time_t now;
1438    time_t too_old;
1439    tr_torrent * tor;
1440    tr_peerMgr * mgr = vmgr;
1441    managerLock( mgr );
1442
1443    now = tr_time( );
1444    too_old = now - REQUEST_TTL_SECS;
1445
1446    tor = NULL;
1447    while(( tor = tr_torrentNext( mgr->session, tor )))
1448    {
1449        Torrent * t = tor->torrentPeers;
1450        const int n = t->requestCount;
1451        if( n > 0 )
1452        {
1453            int keepCount = 0;
1454            int cancelCount = 0;
1455            struct block_request * cancel = tr_new( struct block_request, n );
1456            const struct block_request * it;
1457            const struct block_request * end;
1458
1459            for( it=t->requests, end=it+n; it!=end; ++it )
1460            {
1461                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1462                    cancel[cancelCount++] = *it;
1463                else
1464                {
1465                    if( it != &t->requests[keepCount] )
1466                        t->requests[keepCount] = *it;
1467                    keepCount++;
1468                }
1469            }
1470
1471            /* prune out the ones we aren't keeping */
1472            t->requestCount = keepCount;
1473
1474            /* send cancel messages for all the "cancel" ones */
1475            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1476                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1477                    tr_historyAdd( it->peer->cancelsSentToPeer, now, 1 );
1478                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1479                    decrementPendingReqCount( it );
1480                }
1481            }
1482
1483            /* decrement the pending request counts for the timed-out blocks */
1484            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1485                pieceListRemoveRequest( t, it->block );
1486
1487            /* cleanup loop */
1488            tr_free( cancel );
1489        }
1490    }
1491
1492    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1493    managerUnlock( mgr );
1494}
1495
1496static void
1497addStrike( Torrent * t, tr_peer * peer )
1498{
1499    tordbg( t, "increasing peer %s strike count to %d",
1500            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1501
1502    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1503    {
1504        struct peer_atom * atom = peer->atom;
1505        atom->flags2 |= MYFLAG_BANNED;
1506        peer->doPurge = 1;
1507        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1508    }
1509}
1510
1511static void
1512gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1513{
1514    tr_torrent *   tor = t->tor;
1515    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1516
1517    tor->corruptCur += byteCount;
1518    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1519
1520    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1521}
1522
1523static void
1524peerSuggestedPiece( Torrent            * t UNUSED,
1525                    tr_peer            * peer UNUSED,
1526                    tr_piece_index_t     pieceIndex UNUSED,
1527                    int                  isFastAllowed UNUSED )
1528{
1529#if 0
1530    assert( t );
1531    assert( peer );
1532    assert( peer->msgs );
1533
1534    /* is this a valid piece? */
1535    if(  pieceIndex >= t->tor->info.pieceCount )
1536        return;
1537
1538    /* don't ask for it if we've already got it */
1539    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1540        return;
1541
1542    /* don't ask for it if they don't have it */
1543    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1544        return;
1545
1546    /* don't ask for it if we're choked and it's not fast */
1547    if( !isFastAllowed && peer->clientIsChoked )
1548        return;
1549
1550    /* request the blocks that we don't have in this piece */
1551    {
1552        tr_block_index_t b;
1553        tr_block_index_t first;
1554        tr_block_index_t last;
1555        const tr_torrent * tor = t->tor;
1556
1557        tr_torGetPieceBlockRange( t->tor, pieceIndex, &first, &last );
1558
1559        for( b=first; b<=last; ++b )
1560        {
1561            if( !tr_cpBlockIsComplete( tor->completion, b ) )
1562            {
1563                const uint32_t offset = getBlockOffsetInPiece( tor, b );
1564                const uint32_t length = tr_torBlockCountBytes( tor, b );
1565                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1566                incrementPieceRequests( t, pieceIndex );
1567            }
1568        }
1569    }
1570#endif
1571}
1572
1573static void
1574removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1575{
1576    requestListRemove( t, block, peer );
1577    pieceListRemoveRequest( t, block );
1578}
1579
1580/* peer choked us, or maybe it disconnected.
1581   either way we need to remove all its requests */
1582static void
1583peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1584{
1585    int i, n;
1586    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1587
1588    for( i=n=0; i<t->requestCount; ++i )
1589        if( peer == t->requests[i].peer )
1590            blocks[n++] = t->requests[i].block;
1591
1592    for( i=0; i<n; ++i )
1593        removeRequestFromTables( t, blocks[i], peer );
1594
1595    tr_free( blocks );
1596}
1597
1598static void
1599peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1600{
1601    Torrent * t = vt;
1602
1603    torrentLock( t );
1604
1605    switch( e->eventType )
1606    {
1607        case TR_PEER_PEER_GOT_DATA:
1608        {
1609            const time_t now = tr_time( );
1610            tr_torrent * tor = t->tor;
1611
1612            if( e->wasPieceData )
1613            {
1614                tor->uploadedCur += e->length;
1615                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1616                tr_torrentSetActivityDate( tor, now );
1617                tr_torrentSetDirty( tor );
1618            }
1619
1620            /* update the stats */
1621            if( e->wasPieceData )
1622                tr_statsAddUploaded( tor->session, e->length );
1623
1624            /* update our atom */
1625            if( peer && e->wasPieceData )
1626                peer->atom->piece_data_time = now;
1627
1628            break;
1629        }
1630
1631        case TR_PEER_CLIENT_GOT_HAVE:
1632            if( replicationExists( t ) ) {
1633                tr_incrReplicationOfPiece( t, e->pieceIndex );
1634                assertReplicationCountIsExact( t );
1635            }
1636            break;
1637
1638        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1639            if( replicationExists( t ) ) {
1640                tr_incrReplication( t );
1641                assertReplicationCountIsExact( t );
1642            }
1643            break;
1644
1645        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1646            /* noop */
1647            break;
1648
1649        case TR_PEER_CLIENT_GOT_BITFIELD:
1650            assert( e->bitfield != NULL );
1651            if( replicationExists( t ) ) {
1652                tr_incrReplicationFromBitfield( t, e->bitfield );
1653                assertReplicationCountIsExact( t );
1654            }
1655            break;
1656
1657        case TR_PEER_CLIENT_GOT_REJ:
1658            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1659            break;
1660
1661        case TR_PEER_CLIENT_GOT_CHOKE:
1662            peerDeclinedAllRequests( t, peer );
1663            break;
1664
1665        case TR_PEER_CLIENT_GOT_PORT:
1666            if( peer )
1667                peer->atom->port = e->port;
1668            break;
1669
1670        case TR_PEER_CLIENT_GOT_SUGGEST:
1671            if( peer )
1672                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1673            break;
1674
1675        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1676            if( peer )
1677                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1678            break;
1679
1680        case TR_PEER_CLIENT_GOT_DATA:
1681        {
1682            const time_t now = tr_time( );
1683            tr_torrent * tor = t->tor;
1684
1685            if( e->wasPieceData )
1686            {
1687                tor->downloadedCur += e->length;
1688                tr_torrentSetActivityDate( tor, now );
1689                tr_torrentSetDirty( tor );
1690            }
1691
1692            /* update the stats */
1693            if( e->wasPieceData )
1694                tr_statsAddDownloaded( tor->session, e->length );
1695
1696            /* update our atom */
1697            if( peer && peer->atom && e->wasPieceData )
1698                peer->atom->piece_data_time = now;
1699
1700            break;
1701        }
1702
1703        case TR_PEER_PEER_PROGRESS:
1704        {
1705            if( peer )
1706            {
1707                struct peer_atom * atom = peer->atom;
1708                if( e->progress >= 1.0 ) {
1709                    tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
1710                    atomSetSeed( atom );
1711                }
1712            }
1713            break;
1714        }
1715
1716        case TR_PEER_CLIENT_GOT_BLOCK:
1717        {
1718            tr_torrent * tor = t->tor;
1719            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1720            int i, peerCount;
1721            tr_peer ** peers;
1722            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1723
1724            removeRequestFromTables( t, block, peer );
1725            getBlockRequestPeers( t, block, &peerArr );
1726            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1727
1728            /* remove additional block requests and send cancel to peers */
1729            for( i=0; i<peerCount; i++ ) {
1730                tr_peer * p = peers[i];
1731                assert( p != peer );
1732                if( p->msgs ) {
1733                    tr_historyAdd( p->cancelsSentToPeer, tr_time( ), 1 );
1734                    tr_peerMsgsCancel( p->msgs, block );
1735                }
1736                removeRequestFromTables( t, block, p );
1737            }
1738
1739            tr_ptrArrayDestruct( &peerArr, FALSE );
1740
1741            if( peer && peer->blocksSentToClient )
1742                tr_historyAdd( peer->blocksSentToClient, tr_time( ), 1 );
1743
1744            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1745            {
1746                /* we already have this block... */
1747                const uint32_t n = tr_torBlockCountBytes( tor, block );
1748                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1749                tordbg( t, "we have this block already..." );
1750            }
1751            else
1752            {
1753                tr_cpBlockAdd( &tor->completion, block );
1754                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1755                tr_torrentSetDirty( tor );
1756
1757                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1758                {
1759                    const tr_piece_index_t p = e->pieceIndex;
1760                    const tr_bool ok = tr_torrentCheckPiece( tor, p );
1761
1762                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1763
1764                    if( !ok )
1765                    {
1766                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1767                                   (unsigned long)p );
1768                    }
1769
1770                    tr_peerMgrSetBlame( tor, p, ok );
1771
1772                    if( !ok )
1773                    {
1774                        gotBadPiece( t, p );
1775                    }
1776                    else
1777                    {
1778                        int i;
1779                        int peerCount;
1780                        tr_peer ** peers;
1781                        tr_file_index_t fileIndex;
1782
1783                        /* only add this to downloadedCur if we got it from a peer --
1784                         * webseeds shouldn't count against our ratio. As one tracker
1785                         * admin put it, "Those pieces are downloaded directly from the
1786                         * content distributor, not the peers, it is the tracker's job
1787                         * to manage the swarms, not the web server and does not fit
1788                         * into the jurisdiction of the tracker." */
1789                        if( peer->msgs != NULL ) {
1790                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1791                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1792                        }
1793
1794                        peerCount = tr_ptrArraySize( &t->peers );
1795                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1796                        for( i=0; i<peerCount; ++i )
1797                            tr_peerMsgsHave( peers[i]->msgs, p );
1798
1799                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1800                            const tr_file * file = &tor->info.files[fileIndex];
1801                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1802                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1803                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1804                                    tr_torrentFileCompleted( tor, fileIndex );
1805                                }
1806                            }
1807                        }
1808
1809                        pieceListRemovePiece( t, p );
1810                    }
1811                }
1812
1813                t->needsCompletenessCheck = TRUE;
1814            }
1815            break;
1816        }
1817
1818        case TR_PEER_ERROR:
1819            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1820            {
1821                /* some protocol error from the peer */
1822                peer->doPurge = 1;
1823                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1824                        tr_atomAddrStr( peer->atom ) );
1825            }
1826            else
1827            {
1828                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1829            }
1830            break;
1831
1832        default:
1833            assert( 0 );
1834    }
1835
1836    torrentUnlock( t );
1837}
1838
1839static int
1840getDefaultShelfLife( uint8_t from )
1841{
1842    /* in general, peers obtained from firsthand contact
1843     * are better than those from secondhand, etc etc */
1844    switch( from )
1845    {
1846        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1847        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1848        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1849        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1850        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1851        case TR_PEER_FROM_RESUME   : return 60 * 60;
1852        case TR_PEER_FROM_LPD      : return 10 * 60;
1853        default                    : return 60 * 60;
1854    }
1855}
1856
1857static void
1858ensureAtomExists( Torrent           * t,
1859                  const tr_address  * addr,
1860                  const tr_port       port,
1861                  const uint8_t       flags,
1862                  const int8_t        seedProbability,
1863                  const uint8_t       from )
1864{
1865    struct peer_atom * a;
1866
1867    assert( tr_isAddress( addr ) );
1868    assert( from < TR_PEER_FROM__MAX );
1869
1870    a = getExistingAtom( t, addr );
1871
1872    if( a == NULL )
1873    {
1874        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1875        a = tr_new0( struct peer_atom, 1 );
1876        a->addr = *addr;
1877        a->port = port;
1878        a->flags = flags;
1879        a->fromFirst = from;
1880        a->fromBest = from;
1881        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1882        a->blocklisted = -1;
1883        atomSetSeedProbability( a, seedProbability );
1884        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1885
1886        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1887    }
1888    else
1889    {
1890        if( from < a->fromBest )
1891            a->fromBest = from;
1892       
1893        if( a->seedProbability == -1 )
1894            atomSetSeedProbability( a, seedProbability );
1895
1896        a->flags |= flags;
1897    }
1898}
1899
1900static int
1901getMaxPeerCount( const tr_torrent * tor )
1902{
1903    return tor->maxConnectedPeers;
1904}
1905
1906static int
1907getPeerCount( const Torrent * t )
1908{
1909    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1910}
1911
1912/* FIXME: this is kind of a mess. */
1913static tr_bool
1914myHandshakeDoneCB( tr_handshake  * handshake,
1915                   tr_peerIo     * io,
1916                   tr_bool         readAnythingFromPeer,
1917                   tr_bool         isConnected,
1918                   const uint8_t * peer_id,
1919                   void          * vmanager )
1920{
1921    tr_bool            ok = isConnected;
1922    tr_bool            success = FALSE;
1923    tr_port            port;
1924    const tr_address * addr;
1925    tr_peerMgr       * manager = vmanager;
1926    Torrent          * t;
1927    tr_handshake     * ours;
1928
1929    assert( io );
1930    assert( tr_isBool( ok ) );
1931
1932    t = tr_peerIoHasTorrentHash( io )
1933        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1934        : NULL;
1935
1936    if( tr_peerIoIsIncoming ( io ) )
1937        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1938                                        handshake, handshakeCompare );
1939    else if( t )
1940        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1941                                        handshake, handshakeCompare );
1942    else
1943        ours = handshake;
1944
1945    assert( ours );
1946    assert( ours == handshake );
1947
1948    if( t )
1949        torrentLock( t );
1950
1951    addr = tr_peerIoGetAddress( io, &port );
1952
1953    if( !ok || !t || !t->isRunning )
1954    {
1955        if( t )
1956        {
1957            struct peer_atom * atom = getExistingAtom( t, addr );
1958            if( atom )
1959            {
1960                ++atom->numFails;
1961
1962                if( !readAnythingFromPeer )
1963                {
1964                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1965                    atom->flags2 |= MYFLAG_UNREACHABLE;
1966                }
1967            }
1968        }
1969    }
1970    else /* looking good */
1971    {
1972        struct peer_atom * atom;
1973
1974        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1975        atom = getExistingAtom( t, addr );
1976        atom->time = tr_time( );
1977        atom->piece_data_time = 0;
1978        atom->lastConnectionAt = tr_time( );
1979
1980        if( !tr_peerIoIsIncoming( io ) )
1981        {
1982            atom->flags |= ADDED_F_CONNECTABLE;
1983            atom->flags2 &= ~MYFLAG_UNREACHABLE;
1984        }
1985
1986        /* In principle, this flag specifies whether the peer groks uTP,
1987           not whether it's currently connected over uTP. */
1988        if( io->utp_socket )
1989            atom->flags |= ADDED_F_UTP_FLAGS;
1990
1991        if( atom->flags2 & MYFLAG_BANNED )
1992        {
1993            tordbg( t, "banned peer %s tried to reconnect",
1994                    tr_atomAddrStr( atom ) );
1995        }
1996        else if( tr_peerIoIsIncoming( io )
1997               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1998
1999        {
2000        }
2001        else
2002        {
2003            tr_peer * peer = atom->peer;
2004
2005            if( peer ) /* we already have this peer */
2006            {
2007            }
2008            else
2009            {
2010                peer = getPeer( t, atom );
2011                tr_free( peer->client );
2012
2013                if( !peer_id )
2014                    peer->client = NULL;
2015                else {
2016                    char client[128];
2017                    tr_clientForId( client, sizeof( client ), peer_id );
2018                    peer->client = tr_strdup( client );
2019                }
2020
2021                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2022                                                                balanced by our unref in peerDestructor()  */
2023                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
2024                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2025
2026                success = TRUE;
2027            }
2028        }
2029    }
2030
2031    if( t )
2032        torrentUnlock( t );
2033
2034    return success;
2035}
2036
2037void
2038tr_peerMgrAddIncoming( tr_peerMgr * manager,
2039                       tr_address * addr,
2040                       tr_port      port,
2041                       int          socket,
2042                       struct UTPSocket * utp_socket )
2043{
2044    tr_session * session;
2045
2046    managerLock( manager );
2047
2048    assert( tr_isSession( manager->session ) );
2049    session = manager->session;
2050
2051    if( tr_sessionIsAddressBlocked( session, addr ) )
2052    {
2053        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
2054        if(socket >= 0)
2055            tr_netClose( session, socket );
2056        else
2057            UTP_Close( utp_socket );
2058    }
2059    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2060    {
2061        if(socket >= 0)
2062            tr_netClose( session, socket );
2063        else
2064            UTP_Close( utp_socket );
2065    }
2066    else /* we don't have a connection to them yet... */
2067    {
2068        tr_peerIo *    io;
2069        tr_handshake * handshake;
2070
2071        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket, utp_socket );
2072
2073        handshake = tr_handshakeNew( io,
2074                                     session->encryptionMode,
2075                                     myHandshakeDoneCB,
2076                                     manager );
2077
2078        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2079
2080        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2081                                 handshakeCompare );
2082    }
2083
2084    managerUnlock( manager );
2085}
2086
2087void
2088tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2089                  const tr_pex * pex, int8_t seedProbability )
2090{
2091    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2092    {
2093        Torrent * t = tor->torrentPeers;
2094        managerLock( t->manager );
2095
2096        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2097            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
2098                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2099
2100        managerUnlock( t->manager );
2101    }
2102}
2103
2104void
2105tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2106{
2107    Torrent * t = tor->torrentPeers;
2108    const int n = tr_ptrArraySize( &t->pool );
2109    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2110    struct peer_atom ** end = it + n;
2111
2112    while( it != end )
2113        atomSetSeed( *it++ );
2114}
2115
2116tr_pex *
2117tr_peerMgrCompactToPex( const void *    compact,
2118                        size_t          compactLen,
2119                        const uint8_t * added_f,
2120                        size_t          added_f_len,
2121                        size_t *        pexCount )
2122{
2123    size_t          i;
2124    size_t          n = compactLen / 6;
2125    const uint8_t * walk = compact;
2126    tr_pex *        pex = tr_new0( tr_pex, n );
2127
2128    for( i = 0; i < n; ++i )
2129    {
2130        pex[i].addr.type = TR_AF_INET;
2131        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2132        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2133        if( added_f && ( n == added_f_len ) )
2134            pex[i].flags = added_f[i];
2135    }
2136
2137    *pexCount = n;
2138    return pex;
2139}
2140
2141tr_pex *
2142tr_peerMgrCompact6ToPex( const void    * compact,
2143                         size_t          compactLen,
2144                         const uint8_t * added_f,
2145                         size_t          added_f_len,
2146                         size_t        * pexCount )
2147{
2148    size_t          i;
2149    size_t          n = compactLen / 18;
2150    const uint8_t * walk = compact;
2151    tr_pex *        pex = tr_new0( tr_pex, n );
2152
2153    for( i = 0; i < n; ++i )
2154    {
2155        pex[i].addr.type = TR_AF_INET6;
2156        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2157        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2158        if( added_f && ( n == added_f_len ) )
2159            pex[i].flags = added_f[i];
2160    }
2161
2162    *pexCount = n;
2163    return pex;
2164}
2165
2166tr_pex *
2167tr_peerMgrArrayToPex( const void * array,
2168                      size_t       arrayLen,
2169                      size_t      * pexCount )
2170{
2171    size_t          i;
2172    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2173    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2174    const uint8_t * walk = array;
2175    tr_pex        * pex = tr_new0( tr_pex, n );
2176
2177    for( i = 0 ; i < n ; i++ ) {
2178        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2179        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2180        pex[i].flags = 0x00;
2181        walk += sizeof( tr_address ) + 2;
2182    }
2183
2184    *pexCount = n;
2185    return pex;
2186}
2187
2188/**
2189***
2190**/
2191
2192void
2193tr_peerMgrSetBlame( tr_torrent     * tor,
2194                    tr_piece_index_t pieceIndex,
2195                    int              success )
2196{
2197    if( !success )
2198    {
2199        int        peerCount, i;
2200        Torrent *  t = tor->torrentPeers;
2201        tr_peer ** peers;
2202
2203        assert( torrentIsLocked( t ) );
2204
2205        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2206        for( i = 0; i < peerCount; ++i )
2207        {
2208            tr_peer * peer = peers[i];
2209            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
2210            {
2211                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2212                        tr_atomAddrStr( peer->atom ),
2213                        pieceIndex, (int)peer->strikes + 1 );
2214                addStrike( t, peer );
2215            }
2216        }
2217    }
2218}
2219
2220int
2221tr_pexCompare( const void * va, const void * vb )
2222{
2223    const tr_pex * a = va;
2224    const tr_pex * b = vb;
2225    int i;
2226
2227    assert( tr_isPex( a ) );
2228    assert( tr_isPex( b ) );
2229
2230    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
2231        return i;
2232
2233    if( a->port != b->port )
2234        return a->port < b->port ? -1 : 1;
2235
2236    return 0;
2237}
2238
2239#if 0
2240static int
2241peerPrefersCrypto( const tr_peer * peer )
2242{
2243    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2244        return TRUE;
2245
2246    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2247        return FALSE;
2248
2249    return tr_peerIoIsEncrypted( peer->io );
2250}
2251#endif
2252
2253/* better goes first */
2254static int
2255compareAtomsByUsefulness( const void * va, const void *vb )
2256{
2257    const struct peer_atom * a = * (const struct peer_atom**) va;
2258    const struct peer_atom * b = * (const struct peer_atom**) vb;
2259
2260    assert( tr_isAtom( a ) );
2261    assert( tr_isAtom( b ) );
2262
2263    if( a->piece_data_time != b->piece_data_time )
2264        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2265    if( a->fromBest != b->fromBest )
2266        return a->fromBest < b->fromBest ? -1 : 1;
2267    if( a->numFails != b->numFails )
2268        return a->numFails < b->numFails ? -1 : 1;
2269
2270    return 0;
2271}
2272
2273int
2274tr_peerMgrGetPeers( tr_torrent   * tor,
2275                    tr_pex      ** setme_pex,
2276                    uint8_t        af,
2277                    uint8_t        list_mode,
2278                    int            maxCount )
2279{
2280    int i;
2281    int n;
2282    int count = 0;
2283    int atomCount = 0;
2284    const Torrent * t = tor->torrentPeers;
2285    struct peer_atom ** atoms = NULL;
2286    tr_pex * pex;
2287    tr_pex * walk;
2288
2289    assert( tr_isTorrent( tor ) );
2290    assert( setme_pex != NULL );
2291    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2292    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
2293
2294    managerLock( t->manager );
2295
2296    /**
2297    ***  build a list of atoms
2298    **/
2299
2300    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2301    {
2302        int i;
2303        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2304        atomCount = tr_ptrArraySize( &t->peers );
2305        atoms = tr_new( struct peer_atom *, atomCount );
2306        for( i=0; i<atomCount; ++i )
2307            atoms[i] = peers[i]->atom;
2308    }
2309    else /* TR_PEERS_ALL */
2310    {
2311        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
2312        atomCount = tr_ptrArraySize( &t->pool );
2313        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
2314    }
2315
2316    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2317
2318    /**
2319    ***  add the first N of them into our return list
2320    **/
2321
2322    n = MIN( atomCount, maxCount );
2323    pex = walk = tr_new0( tr_pex, n );
2324
2325    for( i=0; i<atomCount && count<n; ++i )
2326    {
2327        const struct peer_atom * atom = atoms[i];
2328        if( atom->addr.type == af )
2329        {
2330            assert( tr_isAddress( &atom->addr ) );
2331            walk->addr = atom->addr;
2332            walk->port = atom->port;
2333            walk->flags = atom->flags;
2334            ++count;
2335            ++walk;
2336        }
2337    }
2338
2339    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2340
2341    assert( ( walk - pex ) == count );
2342    *setme_pex = pex;
2343
2344    /* cleanup */
2345    tr_free( atoms );
2346    managerUnlock( t->manager );
2347    return count;
2348}
2349
2350static void atomPulse      ( int, short, void * );
2351static void bandwidthPulse ( int, short, void * );
2352static void rechokePulse   ( int, short, void * );
2353static void reconnectPulse ( int, short, void * );
2354
2355static struct event *
2356createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2357{
2358    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2359    tr_timerAddMsec( timer, msec );
2360    return timer;
2361}
2362
2363static void
2364ensureMgrTimersExist( struct tr_peerMgr * m )
2365{
2366    if( m->atomTimer == NULL )
2367        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2368
2369    if( m->bandwidthTimer == NULL )
2370        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2371
2372    if( m->rechokeTimer == NULL )
2373        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2374
2375    if( m->refillUpkeepTimer == NULL )
2376        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2377}
2378
2379void
2380tr_peerMgrStartTorrent( tr_torrent * tor )
2381{
2382    Torrent * t = tor->torrentPeers;
2383
2384    assert( tr_isTorrent( tor ) );
2385    assert( tr_torrentIsLocked( tor ) );
2386
2387    ensureMgrTimersExist( t->manager );
2388
2389    t->isRunning = TRUE;
2390    t->maxPeers = t->tor->maxConnectedPeers;
2391    t->pieceSortState = PIECES_UNSORTED;
2392
2393    rechokePulse( 0, 0, t->manager );
2394}
2395
2396static void
2397stopTorrent( Torrent * t )
2398{
2399    int i, n;
2400
2401    t->isRunning = FALSE;
2402
2403    replicationFree( t );
2404    invalidatePieceSorting( t );
2405
2406    /* disconnect the peers. */
2407    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
2408        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
2409    tr_ptrArrayClear( &t->peers );
2410
2411    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2412     * which removes the handshake from t->outgoingHandshakes... */
2413    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2414        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2415}
2416
2417void
2418tr_peerMgrStopTorrent( tr_torrent * tor )
2419{
2420    assert( tr_isTorrent( tor ) );
2421    assert( tr_torrentIsLocked( tor ) );
2422
2423    stopTorrent( tor->torrentPeers );
2424}
2425
2426void
2427tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2428{
2429    assert( tr_isTorrent( tor ) );
2430    assert( tr_torrentIsLocked( tor ) );
2431    assert( tor->torrentPeers == NULL );
2432
2433    tor->torrentPeers = torrentConstructor( manager, tor );
2434}
2435
2436void
2437tr_peerMgrRemoveTorrent( tr_torrent * tor )
2438{
2439    assert( tr_isTorrent( tor ) );
2440    assert( tr_torrentIsLocked( tor ) );
2441
2442    stopTorrent( tor->torrentPeers );
2443    torrentDestructor( tor->torrentPeers );
2444}
2445
2446void
2447tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2448{
2449    assert( tr_isTorrent( tor ) );
2450    assert( torrentIsLocked( tor->torrentPeers ) );
2451    assert( tab != NULL );
2452    assert( tabCount > 0 );
2453
2454    memset( tab, 0, tabCount );
2455
2456    if( tr_torrentHasMetadata( tor ) )
2457    {
2458        tr_piece_index_t i;
2459        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2460        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2461        const float interval = tor->info.pieceCount / (float)tabCount;
2462        const tr_bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2463
2464        for( i=0; i<tabCount; ++i )
2465        {
2466            const int piece = i * interval;
2467
2468            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2469                tab[i] = -1;
2470            else if( peerCount ) {
2471                int j;
2472                for( j=0; j<peerCount; ++j )
2473                    if( tr_bitsetHas( &peers[j]->have, piece ) )
2474                        ++tab[i];
2475            }
2476        }
2477    }
2478}
2479
2480/* Returns the pieces that are available from peers */
2481tr_bitfield*
2482tr_peerMgrGetAvailable( const tr_torrent * tor )
2483{
2484    int i;
2485    Torrent * t = tor->torrentPeers;
2486    const int peerCount = tr_ptrArraySize( &t->peers );
2487    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2488    tr_bitfield * pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2489
2490    assert( tr_torrentIsLocked( tor ) );
2491
2492    for( i=0; i<peerCount; ++i )
2493        tr_bitsetOr( pieces, &peers[i]->have );
2494
2495    return pieces;
2496}
2497
2498void
2499tr_peerMgrTorrentStats( tr_torrent  * tor,
2500                        int         * setmePeersKnown,
2501                        int         * setmePeersConnected,
2502                        int         * setmeSeedsConnected,
2503                        int         * setmeWebseedsSendingToUs,
2504                        int         * setmePeersSendingToUs,
2505                        int         * setmePeersGettingFromUs,
2506                        int         * setmePeersFrom )
2507{
2508    int i, size;
2509    const Torrent * t = tor->torrentPeers;
2510    const tr_peer ** peers;
2511
2512    assert( tr_torrentIsLocked( tor ) );
2513
2514    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2515    size = tr_ptrArraySize( &t->peers );
2516
2517    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2518    *setmePeersConnected       = 0;
2519    *setmeSeedsConnected       = 0;
2520    *setmePeersGettingFromUs   = 0;
2521    *setmePeersSendingToUs     = 0;
2522    *setmeWebseedsSendingToUs  = 0;
2523
2524    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2525        setmePeersFrom[i] = 0;
2526
2527    for( i=0; i<size; ++i )
2528    {
2529        const tr_peer * peer = peers[i];
2530        const struct peer_atom * atom = peer->atom;
2531
2532        if( peer->io == NULL ) /* not connected */
2533            continue;
2534
2535        ++*setmePeersConnected;
2536
2537        ++setmePeersFrom[atom->fromFirst];
2538
2539        if( clientIsDownloadingFrom( tor, peer ) )
2540            ++*setmePeersSendingToUs;
2541
2542        if( clientIsUploadingTo( peer ) )
2543            ++*setmePeersGettingFromUs;
2544
2545        if( atomIsSeed( atom ) )
2546            ++*setmeSeedsConnected;
2547    }
2548
2549    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2550}
2551
2552int
2553tr_peerMgrGetWebseedSpeed_Bps( const tr_torrent * tor, uint64_t now )
2554{
2555    int i;
2556    int tmp;
2557    int ret = 0;
2558
2559    const Torrent * t = tor->torrentPeers;
2560    const int n = tr_ptrArraySize( &t->webseeds );
2561    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2562
2563    for( i=0; i<n; ++i )
2564        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &tmp ) )
2565            ret += tmp;
2566
2567    return ret;
2568}
2569
2570
2571double*
2572tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2573{
2574    int i;
2575    const Torrent * t = tor->torrentPeers;
2576    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2577    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2578    const uint64_t now = tr_time_msec( );
2579    double * ret = tr_new0( double, webseedCount );
2580
2581    assert( tr_isTorrent( tor ) );
2582    assert( tr_torrentIsLocked( tor ) );
2583    assert( t->manager != NULL );
2584    assert( webseedCount == tor->info.webseedCount );
2585
2586    for( i=0; i<webseedCount; ++i ) {
2587        int Bps;
2588        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2589            ret[i] = Bps / (double)tr_speed_K;
2590        else
2591            ret[i] = -1.0;
2592    }
2593
2594    return ret;
2595}
2596
2597int
2598tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2599{
2600    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2601}
2602
2603
2604struct tr_peer_stat *
2605tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2606{
2607    int i;
2608    const Torrent * t = tor->torrentPeers;
2609    const int size = tr_ptrArraySize( &t->peers );
2610    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2611    const uint64_t now_msec = tr_time_msec( );
2612    const time_t now = tr_time();
2613    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2614
2615    assert( tr_isTorrent( tor ) );
2616    assert( tr_torrentIsLocked( tor ) );
2617    assert( t->manager );
2618
2619    for( i=0; i<size; ++i )
2620    {
2621        char *                   pch;
2622        const tr_peer *          peer = peers[i];
2623        const struct peer_atom * atom = peer->atom;
2624        tr_peer_stat *           stat = ret + i;
2625
2626        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2627        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2628                   sizeof( stat->client ) );
2629        stat->port                = ntohs( peer->atom->port );
2630        stat->from                = atom->fromFirst;
2631        stat->progress            = peer->progress;
2632        stat->isUTP               = peer->io->utp_socket != NULL;
2633        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2634        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2635        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2636        stat->peerIsChoked        = peer->peerIsChoked;
2637        stat->peerIsInterested    = peer->peerIsInterested;
2638        stat->clientIsChoked      = peer->clientIsChoked;
2639        stat->clientIsInterested  = peer->clientIsInterested;
2640        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2641        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2642        stat->isUploadingTo       = clientIsUploadingTo( peer );
2643        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2644
2645        stat->blocksToPeer        = tr_historyGet( peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2646        stat->blocksToClient      = tr_historyGet( peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2647        stat->cancelsToPeer       = tr_historyGet( peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2648        stat->cancelsToClient     = tr_historyGet( peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2649
2650        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2651        stat->pendingReqsToClient = peer->pendingReqsToClient;
2652
2653        pch = stat->flagStr;
2654        if( stat->isUTP ) *pch++ = 'T';
2655        if( t->optimistic == peer ) *pch++ = 'O';
2656        if( stat->isDownloadingFrom ) *pch++ = 'D';
2657        else if( stat->clientIsInterested ) *pch++ = 'd';
2658        if( stat->isUploadingTo ) *pch++ = 'U';
2659        else if( stat->peerIsInterested ) *pch++ = 'u';
2660        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2661        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2662        if( stat->isEncrypted ) *pch++ = 'E';
2663        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2664        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2665        if( stat->isIncoming ) *pch++ = 'I';
2666        *pch = '\0';
2667    }
2668
2669    *setmeCount = size;
2670
2671    return ret;
2672}
2673
2674/**
2675***
2676**/
2677
2678void
2679tr_peerMgrClearInterest( tr_torrent * tor )
2680{
2681    int i;
2682    Torrent * t = tor->torrentPeers;
2683    const int peerCount = tr_ptrArraySize( &t->peers );
2684
2685    assert( tr_isTorrent( tor ) );
2686    assert( tr_torrentIsLocked( tor ) );
2687
2688    for( i=0; i<peerCount; ++i )
2689    {
2690        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2691        tr_peerMsgsSetInterested( peer->msgs, FALSE );
2692    }
2693}
2694
2695/* do we still want this piece and does the peer have it? */
2696static tr_bool
2697isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2698{
2699    return ( !tor->info.pieces[index].dnd ) /* we want it */
2700        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2701        && ( tr_bitsetHas( &peer->have, index ) ); /* peer has it */
2702}
2703
2704/* does this peer have any pieces that we want? */
2705static tr_bool
2706isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2707{
2708    tr_piece_index_t i, n;
2709
2710    if ( tr_torrentIsSeed( tor ) )
2711        return FALSE;
2712
2713    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2714        return FALSE;
2715
2716    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2717        if( isPieceInteresting( tor, peer, i ) )
2718            return TRUE;
2719
2720    return FALSE;
2721}
2722
2723/* determines who we send "interested" messages to */
2724static void
2725rechokeDownloads( Torrent * t )
2726{
2727    int i;
2728    const time_t now = tr_time( );
2729    const int MIN_INTERESTING_PEERS = 5;
2730    const int peerCount = tr_ptrArraySize( &t->peers );
2731    int maxPeers;
2732
2733    int badCount         = 0;
2734    int goodCount        = 0;
2735    int untestedCount    = 0;
2736    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2737    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2738    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2739
2740    /* decide how many peers to be interested in */
2741    {
2742        int blocks = 0;
2743        int cancels = 0;
2744        time_t timeSinceCancel;
2745
2746        /* Count up how many blocks & cancels each peer has.
2747         *
2748         * There are two situations where we send out cancels --
2749         *
2750         * 1. We've got unresponsive peers, which is handled by deciding
2751         *    -which- peers to be interested in.
2752         *
2753         * 2. We've hit our bandwidth cap, which is handled by deciding
2754         *    -how many- peers to be interested in.
2755         *
2756         * We're working on 2. here, so we need to ignore unresponsive
2757         * peers in our calculations lest they confuse Transmission into
2758         * thinking it's hit its bandwidth cap.
2759         */
2760        for( i=0; i<peerCount; ++i )
2761        {
2762            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2763            const int b = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2764            const int c = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2765
2766            if( b == 0 ) /* ignore unresponsive peers, as described above */
2767                continue;
2768
2769            blocks += b;
2770            cancels += c;
2771        }
2772
2773        if( cancels > 0 )
2774        {
2775            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2776             * higher values indicate more congestion. */
2777            const double cancelRate = cancels / (double)(cancels + blocks);
2778            const double mult = 1 - MIN( cancelRate, 0.5 );
2779            maxPeers = t->interestedCount * mult;
2780            tordbg( t, "cancel rate is %.3f -- reducing the "
2781                       "number of peers we're interested in by %.0f percent",
2782                       cancelRate, mult * 100 );
2783            t->lastCancel = now;
2784        }
2785
2786        timeSinceCancel = now - t->lastCancel;
2787        if( timeSinceCancel )
2788        {
2789            const int maxIncrease = 15;
2790            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2791            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2792            const int inc = maxIncrease * mult;
2793            maxPeers = t->maxPeers + inc;
2794            tordbg( t, "time since last cancel is %li -- increasing the "
2795                       "number of peers we're interested in by %d",
2796                       timeSinceCancel, inc );
2797        }
2798    }
2799
2800    /* don't let the previous section's number tweaking go too far... */
2801    if( maxPeers < MIN_INTERESTING_PEERS )
2802        maxPeers = MIN_INTERESTING_PEERS;
2803    if( maxPeers > t->tor->maxConnectedPeers )
2804        maxPeers = t->tor->maxConnectedPeers;
2805
2806    t->maxPeers = maxPeers;
2807
2808    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2809     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2810     * That's the order in which we'll choose who to show interest in */
2811    {
2812        /* Randomize the peer array so the peers in the three groups will be unsorted... */
2813        int n = peerCount;
2814        tr_peer ** peers = tr_memdup( tr_ptrArrayBase( &t->peers ), n * sizeof( tr_peer * ) );
2815
2816        while( n > 0 )
2817        {
2818            const int i = tr_cryptoWeakRandInt( n );
2819            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2820
2821            if( !isPeerInteresting( t->tor, peer ) )
2822            {
2823                tr_peerMsgsSetInterested( peer->msgs, FALSE );
2824            }
2825            else
2826            {
2827                const int blocks = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2828                const int cancels = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2829
2830                if( !blocks && !cancels )
2831                    untested[untestedCount++] = peer;
2832                else if( !cancels )
2833                    good[goodCount++] = peer;
2834                else if( !blocks )
2835                    bad[badCount++] = peer;
2836                else if( ( cancels * 10 ) < blocks )
2837                    good[goodCount++] = peer;
2838                else
2839                    bad[badCount++] = peer;
2840            }
2841
2842            tr_removeElementFromArray( peers, i, sizeof(tr_peer*), n-- );
2843        }
2844
2845        tr_free( peers );
2846    }
2847
2848    t->interestedCount = 0;
2849
2850    /* We've decided (1) how many peers to be interested in,
2851     * and (2) which peers are the best candidates,
2852     * Now it's time to update our `interest' flags. */
2853    for( i=0; i<goodCount; ++i ) {
2854        const tr_bool b = t->interestedCount < maxPeers;
2855        tr_peerMsgsSetInterested( good[i]->msgs, b );
2856        if( b )
2857            ++t->interestedCount;
2858    }
2859    for( i=0; i<untestedCount; ++i ) {
2860        const tr_bool b = t->interestedCount < maxPeers;
2861        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2862        if( b )
2863            ++t->interestedCount;
2864    }
2865    for( i=0; i<badCount; ++i ) {
2866        const tr_bool b = t->interestedCount < maxPeers;
2867        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2868        if( b )
2869            ++t->interestedCount;
2870    }
2871
2872/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2873
2874    /* cleanup */
2875    tr_free( untested );
2876    tr_free( good );
2877    tr_free( bad );
2878}
2879
2880/**
2881***
2882**/
2883
2884struct ChokeData
2885{
2886    tr_bool         isInterested;
2887    tr_bool         wasChoked;
2888    tr_bool         isChoked;
2889    int             rate;
2890    int             salt;
2891    tr_peer *       peer;
2892};
2893
2894static int
2895compareChoke( const void * va,
2896              const void * vb )
2897{
2898    const struct ChokeData * a = va;
2899    const struct ChokeData * b = vb;
2900
2901    if( a->rate != b->rate ) /* prefer higher overall speeds */
2902        return a->rate > b->rate ? -1 : 1;
2903
2904    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2905        return a->wasChoked ? 1 : -1;
2906
2907    if( a->salt != b->salt ) /* random order */
2908        return a->salt - b->salt;
2909
2910    return 0;
2911}
2912
2913/* is this a new connection? */
2914static int
2915isNew( const tr_peer * peer )
2916{
2917    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2918}
2919
2920/* get a rate for deciding which peers to choke and unchoke. */
2921static int
2922getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
2923{
2924    int Bps;
2925
2926    if( tr_torrentIsSeed( tor ) )
2927        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2928
2929    /* downloading a private torrent... take upload speed into account
2930     * because there may only be a small window of opportunity to share */
2931    else if( tr_torrentIsPrivate( tor ) )
2932        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
2933            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2934
2935    /* downloading a public torrent */
2936    else
2937        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
2938
2939    /* convert it to bytes per second */
2940    return Bps;
2941}
2942
2943static inline tr_bool
2944isBandwidthMaxedOut( const tr_bandwidth * b,
2945                     const uint64_t now_msec, tr_direction dir )
2946{
2947    if( !tr_bandwidthIsLimited( b, dir ) )
2948        return FALSE;
2949    else {
2950        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
2951        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
2952        return got >= want;
2953    }
2954}
2955
2956static void
2957rechokeUploads( Torrent * t, const uint64_t now )
2958{
2959    int i, size, unchokedInterested;
2960    const int peerCount = tr_ptrArraySize( &t->peers );
2961    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2962    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2963    const tr_session * session = t->manager->session;
2964    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2965    const tr_bool isMaxedOut = isBandwidthMaxedOut( t->tor->bandwidth, now, TR_UP );
2966
2967    assert( torrentIsLocked( t ) );
2968
2969    /* an optimistic unchoke peer's "optimistic"
2970     * state lasts for N calls to rechokeUploads(). */
2971    if( t->optimisticUnchokeTimeScaler > 0 )
2972        t->optimisticUnchokeTimeScaler--;
2973    else
2974        t->optimistic = NULL;
2975
2976    /* sort the peers by preference and rate */
2977    for( i = 0, size = 0; i < peerCount; ++i )
2978    {
2979        tr_peer * peer = peers[i];
2980        struct peer_atom * atom = peer->atom;
2981
2982        if( peer->progress >= 1.0 ) /* choke all seeds */
2983        {
2984            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2985        }
2986        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2987        {
2988            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2989        }
2990        else if( chokeAll ) /* choke everyone if we're not uploading */
2991        {
2992            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2993        }
2994        else if( peer != t->optimistic )
2995        {
2996            struct ChokeData * n = &choke[size++];
2997            n->peer         = peer;
2998            n->isInterested = peer->peerIsInterested;
2999            n->wasChoked    = peer->peerIsChoked;
3000            n->rate         = getRate( t->tor, atom, now );
3001            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
3002            n->isChoked     = TRUE;
3003        }
3004    }
3005
3006    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3007
3008    /**
3009     * Reciprocation and number of uploads capping is managed by unchoking
3010     * the N peers which have the best upload rate and are interested.
3011     * This maximizes the client's download rate. These N peers are
3012     * referred to as downloaders, because they are interested in downloading
3013     * from the client.
3014     *
3015     * Peers which have a better upload rate (as compared to the downloaders)
3016     * but aren't interested get unchoked. If they become interested, the
3017     * downloader with the worst upload rate gets choked. If a client has
3018     * a complete file, it uses its upload rate rather than its download
3019     * rate to decide which peers to unchoke.
3020     *
3021     * If our bandwidth is maxed out, don't unchoke any more peers.
3022     */
3023    unchokedInterested = 0;
3024    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3025        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : FALSE;
3026        if( choke[i].isInterested )
3027            ++unchokedInterested;
3028    }
3029
3030    /* optimistic unchoke */
3031    if( !t->optimistic && !isMaxedOut && (i<size) )
3032    {
3033        int n;
3034        struct ChokeData * c;
3035        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3036
3037        for( ; i<size; ++i )
3038        {
3039            if( choke[i].isInterested )
3040            {
3041                const tr_peer * peer = choke[i].peer;
3042                int x = 1, y;
3043                if( isNew( peer ) ) x *= 3;
3044                for( y=0; y<x; ++y )
3045                    tr_ptrArrayAppend( &randPool, &choke[i] );
3046            }
3047        }
3048
3049        if(( n = tr_ptrArraySize( &randPool )))
3050        {
3051            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3052            c->isChoked = FALSE;
3053            t->optimistic = c->peer;
3054            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3055        }
3056
3057        tr_ptrArrayDestruct( &randPool, NULL );
3058    }
3059
3060    for( i=0; i<size; ++i )
3061        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3062
3063    /* cleanup */
3064    tr_free( choke );
3065}
3066
3067static void
3068rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3069{
3070    tr_torrent * tor = NULL;
3071    tr_peerMgr * mgr = vmgr;
3072    const uint64_t now = tr_sessionGetTimeMsec( mgr->session );
3073
3074    managerLock( mgr );
3075
3076    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3077        if( tor->isRunning ) {
3078            rechokeUploads( tor->torrentPeers, now );
3079            if( !tr_torrentIsSeed( tor ) )
3080                rechokeDownloads( tor->torrentPeers );
3081        }
3082    }
3083
3084    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3085    managerUnlock( mgr );
3086}
3087
3088/***
3089****
3090****  Life and Death
3091****
3092***/
3093
3094static tr_bool
3095shouldPeerBeClosed( const Torrent    * t,
3096                    const tr_peer    * peer,
3097                    int                peerCount,
3098                    const time_t       now )
3099{
3100    const tr_torrent *       tor = t->tor;
3101    const struct peer_atom * atom = peer->atom;
3102
3103    /* if it's marked for purging, close it */
3104    if( peer->doPurge )
3105    {
3106        tordbg( t, "purging peer %s because its doPurge flag is set",
3107                tr_atomAddrStr( atom ) );
3108        return TRUE;
3109    }
3110
3111    /* if we're seeding and the peer has everything we have,
3112     * and enough time has passed for a pex exchange, then disconnect */
3113    if( tr_torrentIsSeed( tor ) && ( peer->progress >= 1.0f ) )
3114        return !tr_torrentAllowsPex(tor) || (now-atom->time>=30);
3115
3116    /* disconnect if it's been too long since piece data has been transferred.
3117     * this is on a sliding scale based on number of available peers... */
3118    {
3119        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3120        /* if we have >= relaxIfFewerThan, strictness is 100%.
3121         * if we have zero connections, strictness is 0% */
3122        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3123                               ? 1.0
3124                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3125        const int lo = MIN_UPLOAD_IDLE_SECS;
3126        const int hi = MAX_UPLOAD_IDLE_SECS;
3127        const int limit = hi - ( ( hi - lo ) * strictness );
3128        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3129/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3130        if( idleTime > limit ) {
3131            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3132                       tr_atomAddrStr( atom ), idleTime );
3133            return TRUE;
3134        }
3135    }
3136
3137    return FALSE;
3138}
3139
3140static tr_peer **
3141getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3142{
3143    int i, peerCount, outsize;
3144    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3145    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
3146
3147    assert( torrentIsLocked( t ) );
3148
3149    for( i = outsize = 0; i < peerCount; ++i )
3150        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) )
3151            ret[outsize++] = peers[i];
3152
3153    *setmeSize = outsize;
3154    return ret;
3155}
3156
3157static int
3158getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3159{
3160    int sec;
3161
3162    /* if we were recently connected to this peer and transferring piece
3163     * data, try to reconnect to them sooner rather that later -- we don't
3164     * want network troubles to get in the way of a good peer. */
3165    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3166        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3167
3168    /* don't allow reconnects more often than our minimum */
3169    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3170        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3171
3172    /* otherwise, the interval depends on how many times we've tried
3173     * and failed to connect to the peer */
3174    else switch( atom->numFails ) {
3175        case 0: sec = 0; break;
3176        case 1: sec = 5; break;
3177        case 2: sec = 2 * 60; break;
3178        case 3: sec = 15 * 60; break;
3179        case 4: sec = 30 * 60; break;
3180        case 5: sec = 60 * 60; break;
3181        default: sec = 120 * 60; break;
3182    }
3183
3184    /* penalize peers that were unreachable the last time we tried */
3185    if( atom->flags2 & MYFLAG_UNREACHABLE )
3186        sec += sec;
3187
3188    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3189    return sec;
3190}
3191
3192static void
3193removePeer( Torrent * t, tr_peer * peer )
3194{
3195    tr_peer * removed;
3196    struct peer_atom * atom = peer->atom;
3197
3198    assert( torrentIsLocked( t ) );
3199    assert( atom );
3200
3201    atom->time = tr_time( );
3202
3203    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3204
3205    if( replicationExists( t ) )
3206        tr_decrReplicationFromBitset( t, &peer->have );
3207
3208    assert( removed == peer );
3209    peerDestructor( t, removed );
3210}
3211
3212static void
3213closePeer( Torrent * t, tr_peer * peer )
3214{
3215    struct peer_atom * atom;
3216
3217    assert( t != NULL );
3218    assert( peer != NULL );
3219
3220    atom = peer->atom;
3221
3222    /* if we transferred piece data, then they might be good peers,
3223       so reset their `numFails' weight to zero. otherwise we connected
3224       to them fruitlessly, so mark it as another fail */
3225    if( atom->piece_data_time ) {
3226        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3227        atom->numFails = 0;
3228    } else {
3229        ++atom->numFails;
3230        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3231    }
3232
3233    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3234    removePeer( t, peer );
3235}
3236
3237static void
3238removeAllPeers( Torrent * t )
3239{
3240    while( !tr_ptrArrayEmpty( &t->peers ) )
3241        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3242}
3243
3244static void
3245closeBadPeers( Torrent * t, const time_t now_sec )
3246{
3247    int i;
3248    int peerCount;
3249    struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3250    for( i=0; i<peerCount; ++i )
3251        closePeer( t, peers[i] );
3252    tr_free( peers );
3253}
3254
3255struct peer_liveliness
3256{
3257    tr_peer * peer;
3258    void * clientData;
3259    time_t pieceDataTime;
3260    time_t time;
3261    int speed;
3262    tr_bool doPurge;
3263};
3264
3265static int
3266comparePeerLiveliness( const void * va, const void * vb )
3267{
3268    const struct peer_liveliness * a = va;
3269    const struct peer_liveliness * b = vb;
3270
3271    if( a->doPurge != b->doPurge )
3272        return a->doPurge ? 1 : -1;
3273
3274    if( a->speed != b->speed ) /* faster goes first */
3275        return a->speed > b->speed ? -1 : 1;
3276
3277    /* the one to give us data more recently goes first */
3278    if( a->pieceDataTime != b->pieceDataTime )
3279        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3280
3281    /* the one we connected to most recently goes first */
3282    if( a->time != b->time )
3283        return a->time > b->time ? -1 : 1;
3284
3285    return 0;
3286}
3287
3288static void
3289sortPeersByLivelinessImpl( tr_peer  ** peers,
3290                           void     ** clientData,
3291                           int         n,
3292                           uint64_t    now,
3293                           int (*compare) ( const void *va, const void *vb ) )
3294{
3295    int i;
3296    struct peer_liveliness *lives, *l;
3297
3298    /* build a sortable array of peer + extra info */
3299    lives = l = tr_new0( struct peer_liveliness, n );
3300    for( i=0; i<n; ++i, ++l )
3301    {
3302        tr_peer * p = peers[i];
3303        l->peer = p;
3304        l->doPurge = p->doPurge;
3305        l->pieceDataTime = p->atom->piece_data_time;
3306        l->time = p->atom->time;
3307        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3308                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3309        if( clientData )
3310            l->clientData = clientData[i];
3311    }
3312
3313    /* sort 'em */
3314    assert( n == ( l - lives ) );
3315    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3316
3317    /* build the peer array */
3318    for( i=0, l=lives; i<n; ++i, ++l ) {
3319        peers[i] = l->peer;
3320        if( clientData )
3321            clientData[i] = l->clientData;
3322    }
3323    assert( n == ( l - lives ) );
3324
3325    /* cleanup */
3326    tr_free( lives );
3327}
3328
3329static void
3330sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3331{
3332    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3333}
3334
3335
3336static void
3337enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3338{
3339    int n = tr_ptrArraySize( &t->peers );
3340    const int max = tr_torrentGetPeerLimit( t->tor );
3341    if( n > max )
3342    {
3343        void * base = tr_ptrArrayBase( &t->peers );
3344        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3345        sortPeersByLiveliness( peers, NULL, n, now );
3346        while( n > max )
3347            closePeer( t, peers[--n] );
3348        tr_free( peers );
3349    }
3350}
3351
3352static void
3353enforceSessionPeerLimit( tr_session * session, uint64_t now )
3354{
3355    int n = 0;
3356    tr_torrent * tor = NULL;
3357    const int max = tr_sessionGetPeerLimit( session );
3358
3359    /* count the total number of peers */
3360    while(( tor = tr_torrentNext( session, tor )))
3361        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3362
3363    /* if there are too many, prune out the worst */
3364    if( n > max )
3365    {
3366        tr_peer ** peers = tr_new( tr_peer*, n );
3367        Torrent ** torrents = tr_new( Torrent*, n );
3368
3369        /* populate the peer array */
3370        n = 0;
3371        tor = NULL;
3372        while(( tor = tr_torrentNext( session, tor ))) {
3373            int i;
3374            Torrent * t = tor->torrentPeers;
3375            const int tn = tr_ptrArraySize( &t->peers );
3376            for( i=0; i<tn; ++i, ++n ) {
3377                peers[n] = tr_ptrArrayNth( &t->peers, i );
3378                torrents[n] = t;
3379            }
3380        }
3381
3382        /* sort 'em */
3383        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3384
3385        /* cull out the crappiest */
3386        while( n-- > max )
3387            closePeer( torrents[n], peers[n] );
3388
3389        /* cleanup */
3390        tr_free( torrents );
3391        tr_free( peers );
3392    }
3393}
3394
3395static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3396
3397static void
3398reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3399{
3400    tr_torrent * tor;
3401    tr_peerMgr * mgr = vmgr;
3402    const time_t now_sec = tr_time( );
3403    const uint64_t now_msec = tr_sessionGetTimeMsec( mgr->session );
3404
3405    /**
3406    ***  enforce the per-session and per-torrent peer limits
3407    **/
3408
3409    /* if we're over the per-torrent peer limits, cull some peers */
3410    tor = NULL;
3411    while(( tor = tr_torrentNext( mgr->session, tor )))
3412        if( tor->isRunning )
3413            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3414
3415    /* if we're over the per-session peer limits, cull some peers */
3416    enforceSessionPeerLimit( mgr->session, now_msec );
3417
3418    /* remove crappy peers */
3419    tor = NULL;
3420    while(( tor = tr_torrentNext( mgr->session, tor )))
3421        if( !tor->torrentPeers->isRunning )
3422            removeAllPeers( tor->torrentPeers );
3423        else
3424            closeBadPeers( tor->torrentPeers, now_sec );
3425
3426    /* try to make new peer connections */
3427    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3428}
3429
3430/****
3431*****
3432*****  BANDWIDTH ALLOCATION
3433*****
3434****/
3435
3436static void
3437pumpAllPeers( tr_peerMgr * mgr )
3438{
3439    tr_torrent * tor = NULL;
3440
3441    while(( tor = tr_torrentNext( mgr->session, tor )))
3442    {
3443        int j;
3444        Torrent * t = tor->torrentPeers;
3445
3446        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3447        {
3448            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3449            tr_peerMsgsPulse( peer->msgs );
3450        }
3451    }
3452}
3453
3454static void
3455bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3456{
3457    tr_torrent * tor;
3458    tr_peerMgr * mgr = vmgr;
3459    managerLock( mgr );
3460
3461    /* FIXME: this next line probably isn't necessary... */
3462    pumpAllPeers( mgr );
3463
3464    /* allocate bandwidth to the peers */
3465    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3466    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3467
3468    /* possibly stop torrents that have seeded enough */
3469    tor = NULL;
3470    while(( tor = tr_torrentNext( mgr->session, tor )))
3471        tr_torrentCheckSeedLimit( tor );
3472
3473    /* run the completeness check for any torrents that need it */
3474    tor = NULL;
3475    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3476        if( tor->torrentPeers->needsCompletenessCheck ) {
3477            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3478            tr_torrentRecheckCompleteness( tor );
3479        }
3480    }
3481
3482    /* stop torrents that are ready to stop, but couldn't be stopped earlier
3483     * during the peer-io callback call chain */
3484    tor = NULL;
3485    while(( tor = tr_torrentNext( mgr->session, tor )))
3486        if( tor->isStopping )
3487            tr_torrentStop( tor );
3488
3489    reconnectPulse( 0, 0, mgr );
3490
3491    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3492    managerUnlock( mgr );
3493}
3494
3495/***
3496****
3497***/
3498
3499static int
3500compareAtomPtrsByAddress( const void * va, const void *vb )
3501{
3502    const struct peer_atom * a = * (const struct peer_atom**) va;
3503    const struct peer_atom * b = * (const struct peer_atom**) vb;
3504
3505    assert( tr_isAtom( a ) );
3506    assert( tr_isAtom( b ) );
3507
3508    return tr_compareAddresses( &a->addr, &b->addr );
3509}
3510
3511/* best come first, worst go last */
3512static int
3513compareAtomPtrsByShelfDate( const void * va, const void *vb )
3514{
3515    time_t atime;
3516    time_t btime;
3517    const struct peer_atom * a = * (const struct peer_atom**) va;
3518    const struct peer_atom * b = * (const struct peer_atom**) vb;
3519    const int data_time_cutoff_secs = 60 * 60;
3520    const time_t tr_now = tr_time( );
3521
3522    assert( tr_isAtom( a ) );
3523    assert( tr_isAtom( b ) );
3524
3525    /* primary key: the last piece data time *if* it was within the last hour */
3526    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3527    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3528    if( atime != btime )
3529        return atime > btime ? -1 : 1;
3530
3531    /* secondary key: shelf date. */
3532    if( a->shelf_date != b->shelf_date )
3533        return a->shelf_date > b->shelf_date ? -1 : 1;
3534
3535    return 0;
3536}
3537
3538static int
3539getMaxAtomCount( const tr_torrent * tor )
3540{
3541    const int n = tor->maxConnectedPeers;
3542    /* approximate fit of the old jump discontinuous function */
3543    if( n >= 55 ) return     n + 150;
3544    if( n >= 20 ) return 2 * n + 95;
3545    return               4 * n + 55;
3546}
3547
3548static void
3549atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3550{
3551    tr_torrent * tor = NULL;
3552    tr_peerMgr * mgr = vmgr;
3553    managerLock( mgr );
3554
3555    while(( tor = tr_torrentNext( mgr->session, tor )))
3556    {
3557        int atomCount;
3558        Torrent * t = tor->torrentPeers;
3559        const int maxAtomCount = getMaxAtomCount( tor );
3560        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3561
3562        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3563        {
3564            int i;
3565            int keepCount = 0;
3566            int testCount = 0;
3567            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3568            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3569
3570            /* keep the ones that are in use */
3571            for( i=0; i<atomCount; ++i ) {
3572                struct peer_atom * atom = atoms[i];
3573                if( peerIsInUse( t, atom ) )
3574                    keep[keepCount++] = atom;
3575                else
3576                    test[testCount++] = atom;
3577            }
3578
3579            /* if there's room, keep the best of what's left */
3580            i = 0;
3581            if( keepCount < maxAtomCount ) {
3582                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3583                while( i<testCount && keepCount<maxAtomCount )
3584                    keep[keepCount++] = test[i++];
3585            }
3586
3587            /* free the culled atoms */
3588            while( i<testCount )
3589                tr_free( test[i++] );
3590
3591            /* rebuild Torrent.pool with what's left */
3592            tr_ptrArrayDestruct( &t->pool, NULL );
3593            t->pool = TR_PTR_ARRAY_INIT;
3594            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3595            for( i=0; i<keepCount; ++i )
3596                tr_ptrArrayAppend( &t->pool, keep[i] );
3597
3598            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3599
3600            /* cleanup */
3601            tr_free( test );
3602            tr_free( keep );
3603        }
3604    }
3605
3606    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3607    managerUnlock( mgr );
3608}
3609
3610/***
3611****
3612****
3613****
3614***/
3615
3616/* is this atom someone that we'd want to initiate a connection to? */
3617static tr_bool
3618isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3619{
3620    /* not if we're both seeds */
3621    if( tr_torrentIsSeed( tor ) )
3622        if( atomIsSeed( atom ) || ( atom->uploadOnly == UPLOAD_ONLY_YES ) )
3623            return FALSE;
3624
3625    /* not if we've already got a connection to them... */
3626    if( peerIsInUse( tor->torrentPeers, atom ) )
3627        return FALSE;
3628
3629    /* not if we just tried them already */
3630    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3631        return FALSE;
3632
3633    /* not if they're blocklisted */
3634    if( isAtomBlocklisted( tor->session, atom ) )
3635        return FALSE;
3636
3637    /* not if they're banned... */
3638    if( atom->flags2 & MYFLAG_BANNED )
3639        return FALSE;
3640
3641    return TRUE;
3642}
3643
3644struct peer_candidate
3645{
3646    uint64_t score;
3647    tr_torrent * tor;
3648    struct peer_atom * atom;
3649};
3650
3651static tr_bool
3652torrentWasRecentlyStarted( const tr_torrent * tor )
3653{
3654    return difftime( tr_time( ), tor->startDate ) < 120;
3655}
3656
3657static inline uint64_t
3658addValToKey( uint64_t value, int width, uint64_t addme )
3659{
3660    value = (value << (uint64_t)width);
3661    value |= addme;
3662    return value;
3663}
3664
3665/* smaller value is better */
3666static uint64_t
3667getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3668{
3669    uint64_t i;
3670    uint64_t score = 0;
3671    const tr_bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3672
3673    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3674    i = failed ? 1 : 0;
3675    score = addValToKey( score, 1, i );
3676
3677    /* prefer the one we attempted least recently (to cycle through all peers) */
3678    i = atom->lastConnectionAttemptAt;
3679    score = addValToKey( score, 32, i );
3680
3681    /* prefer peers belonging to a torrent of a higher priority */
3682    switch( tr_torrentGetPriority( tor ) ) {
3683        case TR_PRI_HIGH:    i = 0; break;
3684        case TR_PRI_NORMAL:  i = 1; break;
3685        case TR_PRI_LOW:     i = 2; break;
3686    }
3687    score = addValToKey( score, 4, i );
3688
3689    /* prefer recently-started torrents */
3690    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3691    score = addValToKey( score, 1, i );
3692
3693    /* prefer torrents we're downloading with */
3694    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3695    score = addValToKey( score, 1, i );
3696
3697    /* prefer peers that are known to be connectible */
3698    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3699    score = addValToKey( score, 1, i );
3700
3701    /* prefer peers that we might have a chance of uploading to...
3702       so lower seed probability is better */
3703    if( atom->seedProbability == 100 ) i = 101;
3704    else if( atom->seedProbability == -1 ) i = 100;
3705    else i = atom->seedProbability;
3706    score = addValToKey( score, 8, i );
3707
3708    /* Prefer peers that we got from more trusted sources.
3709     * lower `fromBest' values indicate more trusted sources */
3710    score = addValToKey( score, 4, atom->fromBest );
3711
3712    /* salt */
3713    score = addValToKey( score, 8, salt );
3714
3715    return score;
3716}
3717
3718/* sort an array of peer candidates */
3719static int
3720comparePeerCandidates( const void * va, const void * vb )
3721{
3722    const struct peer_candidate * a = va;
3723    const struct peer_candidate * b = vb;
3724
3725    if( a->score < b->score ) return -1;
3726    if( a->score > b->score ) return 1;
3727
3728    return 0;
3729}
3730
3731/** @return an array of all the atoms we might want to connect to */
3732static struct peer_candidate*
3733getPeerCandidates( tr_session * session, int * candidateCount )
3734{
3735    int n;
3736    tr_torrent * tor;
3737    struct peer_candidate * candidates;
3738    struct peer_candidate * walk;
3739    const time_t now = tr_time( );
3740    const uint64_t now_msec = tr_time_msec( );
3741    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3742    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3743
3744    /* don't start any new handshakes if we're full up */
3745    n = 0;
3746    tor= NULL;
3747    while(( tor = tr_torrentNext( session, tor )))
3748        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3749    if( maxCandidates <= n ) {
3750        *candidateCount = 0;
3751        return NULL;
3752    }
3753
3754    /* allocate an array of candidates */
3755    n = 0;
3756    tor= NULL;
3757    while(( tor = tr_torrentNext( session, tor )))
3758        n += tr_ptrArraySize( &tor->torrentPeers->pool );
3759    walk = candidates = tr_new( struct peer_candidate, n );
3760
3761    /* populate the candidate array */
3762    tor = NULL;
3763    while(( tor = tr_torrentNext( session, tor )))
3764    {
3765        int i, nAtoms;
3766        struct peer_atom ** atoms;
3767
3768        if( !tor->torrentPeers->isRunning )
3769            continue;
3770
3771        /* if we've already got enough peers in this torrent... */
3772        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3773            continue;
3774
3775        /* if we've already got enough speed in this torrent... */
3776        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( tor->bandwidth, now_msec, TR_UP ) )
3777            continue;
3778
3779        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3780        for( i=0; i<nAtoms; ++i )
3781        {
3782            struct peer_atom * atom = atoms[i];
3783
3784            if( isPeerCandidate( tor, atom, now ) )
3785            {
3786                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3787                walk->tor = tor;
3788                walk->atom = atom;
3789                walk->score = getPeerCandidateScore( tor, atom, salt );
3790                ++walk;
3791            }
3792        }
3793    }
3794
3795    *candidateCount = walk - candidates;
3796    if( *candidateCount > 1 )
3797        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3798    return candidates;
3799}
3800
3801static void
3802initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3803{
3804    tr_peerIo * io;
3805    const time_t now = tr_time( );
3806    tr_bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3807
3808    if( atom->fromFirst == TR_PEER_FROM_PEX )
3809        /* PEX has explicit signalling for uTP support.  If an atom
3810           originally came from PEX and doesn't have the uTP flag, skip the
3811           uTP connection attempt.  Are we being optimistic here? */
3812        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3813
3814    tordbg( t, "Starting an OUTGOING%s connection with %s",
3815            utp ? " µTP" : "",
3816            tr_atomAddrStr( atom ) );
3817
3818    io = tr_peerIoNewOutgoing( mgr->session,
3819                               mgr->session->bandwidth,
3820                               &atom->addr,
3821                               atom->port,
3822                               t->tor->info.hash,
3823                               t->tor->completeness == TR_SEED,
3824                               utp );
3825
3826    if( io == NULL )
3827    {
3828        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3829                tr_atomAddrStr( atom ) );
3830        atom->flags2 |= MYFLAG_UNREACHABLE;
3831        atom->numFails++;
3832    }
3833    else
3834    {
3835        tr_handshake * handshake = tr_handshakeNew( io,
3836                                                    mgr->session->encryptionMode,
3837                                                    myHandshakeDoneCB,
3838                                                    mgr );
3839
3840        assert( tr_peerIoGetTorrentHash( io ) );
3841
3842        tr_peerIoUnref( io ); /* balanced by the initial ref
3843                                 in tr_peerIoNewOutgoing() */
3844
3845        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3846                                 handshakeCompare );
3847    }
3848
3849    atom->lastConnectionAttemptAt = now;
3850    atom->time = now;
3851}
3852
3853static void
3854initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3855{
3856#if 0
3857    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3858             tr_atomAddrStr( c->atom ),
3859             tr_torrentName( c->tor ),
3860             (int)c->atom->seedProbability,
3861             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3862             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3863#endif
3864
3865    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3866}
3867
3868static void
3869makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3870{
3871    int i, n;
3872    struct peer_candidate * candidates;
3873
3874    candidates = getPeerCandidates( mgr->session, &n );
3875
3876    for( i=0; i<n && i<max; ++i )
3877        initiateCandidateConnection( mgr, &candidates[i] );
3878
3879    tr_free( candidates );
3880}
Note: See TracBrowser for help on using the repository browser.