source: trunk/libtransmission/peer-mgr.c @ 11960

Last change on this file since 11960 was 11960, checked in by jordan, 11 years ago

add configure script switch to enable/disable utp

  • Property svn:keywords set to Date Rev Author Id
File size: 114.5 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 11960 2011-02-18 00:45:44Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "session.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to cull old atoms */
46    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
47
48    /* how frequently to change which peers are choked */
49    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
50
51    /* an optimistically unchoked peer is immune from rechoking
52       for this many calls to rechokeUploads(). */
53    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
54
55    /* how frequently to reallocate bandwidth */
56    BANDWIDTH_PERIOD_MSEC = 500,
57
58    /* how frequently to age out old piece request lists */
59    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
60
61    /* how frequently to decide which peers live and die */
62    RECONNECT_PERIOD_MSEC = 500,
63
64    /* when many peers are available, keep idle ones this long */
65    MIN_UPLOAD_IDLE_SECS = ( 60 ),
66
67    /* when few peers are available, keep idle ones this long */
68    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
69
70    /* max number of peers to ask for per second overall.
71     * this throttle is to avoid overloading the router */
72    MAX_CONNECTIONS_PER_SECOND = 12,
73
74    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
75
76    /* number of bad pieces a peer is allowed to send before we ban them */
77    MAX_BAD_PIECES_PER_PEER = 5,
78
79    /* amount of time to keep a list of request pieces lying around
80       before it's considered too old and needs to be rebuilt */
81    PIECE_LIST_SHELF_LIFE_SECS = 60,
82
83    /* use for bitwise operations w/peer_atom.flags2 */
84    MYFLAG_BANNED = 1,
85
86    /* use for bitwise operations w/peer_atom.flags2 */
87    /* unreachable for now... but not banned.
88     * if they try to connect to us it's okay */
89    MYFLAG_UNREACHABLE = 2,
90
91    /* the minimum we'll wait before attempting to reconnect to a peer */
92    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
93
94    /** how long we'll let requests we've made linger before we cancel them */
95    REQUEST_TTL_SECS = 120,
96
97    NO_BLOCKS_CANCEL_HISTORY = 120,
98
99    CANCEL_HISTORY_SEC = 60
100};
101
102const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0.0f, 0, FALSE, 0 };
103
104/**
105***
106**/
107
108enum
109{
110    UPLOAD_ONLY_UKNOWN,
111    UPLOAD_ONLY_YES,
112    UPLOAD_ONLY_NO
113};
114
115/**
116 * Peer information that should be kept even before we've connected and
117 * after we've disconnected. These are kept in a pool of peer_atoms to decide
118 * which ones would make good candidates for connecting to, and to watch out
119 * for banned peers.
120 *
121 * @see tr_peer
122 * @see tr_peermsgs
123 */
124struct peer_atom
125{
126    uint8_t     fromFirst;          /* where the peer was first found */
127    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
128    uint8_t     flags;              /* these match the added_f flags */
129    uint8_t     flags2;             /* flags that aren't defined in added_f */
130    uint8_t     uploadOnly;         /* UPLOAD_ONLY_ */
131    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
132    int8_t      blocklisted;        /* -1 for unknown, TRUE for blocklisted, FALSE for not blocklisted */
133
134    tr_port     port;
135    tr_bool     utp_failed;         /* We recently failed to connect over uTP */
136    uint16_t    numFails;
137    time_t      time;               /* when the peer's connection status last changed */
138    time_t      piece_data_time;
139
140    time_t      lastConnectionAttemptAt;
141    time_t      lastConnectionAt;
142
143    /* similar to a TTL field, but less rigid --
144     * if the swarm is small, the atom will be kept past this date. */
145    time_t      shelf_date;
146    tr_peer   * peer;               /* will be NULL if not connected */
147    tr_address  addr;
148};
149
150#ifdef NDEBUG
151#define tr_isAtom(a) (TRUE)
152#else
153static tr_bool
154tr_isAtom( const struct peer_atom * atom )
155{
156    return ( atom != NULL )
157        && ( atom->fromFirst < TR_PEER_FROM__MAX )
158        && ( atom->fromBest < TR_PEER_FROM__MAX )
159        && ( tr_isAddress( &atom->addr ) );
160}
161#endif
162
163static const char*
164tr_atomAddrStr( const struct peer_atom * atom )
165{
166    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
167}
168
169struct block_request
170{
171    tr_block_index_t block;
172    tr_peer * peer;
173    time_t sentAt;
174};
175
176struct weighted_piece
177{
178    tr_piece_index_t index;
179    int16_t salt;
180    int16_t requestCount;
181};
182
183enum piece_sort_state
184{
185    PIECES_UNSORTED,
186    PIECES_SORTED_BY_INDEX,
187    PIECES_SORTED_BY_WEIGHT
188};
189
190/** @brief Opaque, per-torrent data structure for peer connection information */
191typedef struct tr_torrent_peers
192{
193    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
194    tr_ptrArray                pool; /* struct peer_atom */
195    tr_ptrArray                peers; /* tr_peer */
196    tr_ptrArray                webseeds; /* tr_webseed */
197
198    tr_torrent               * tor;
199    struct tr_peerMgr        * manager;
200
201    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
202    int                        optimisticUnchokeTimeScaler;
203
204    tr_bool                    isRunning;
205    tr_bool                    needsCompletenessCheck;
206
207    struct block_request     * requests;
208    int                        requestCount;
209    int                        requestAlloc;
210
211    struct weighted_piece    * pieces;
212    int                        pieceCount;
213    enum piece_sort_state      pieceSortState;
214
215    /* An array of pieceCount items stating how many peers have each piece.
216       This is used to help us for downloading pieces "rarest first."
217       This may be NULL if we don't have metainfo yet, or if we're not
218       downloading and don't care about rarity */
219    uint16_t                 * pieceReplication;
220    size_t                     pieceReplicationSize;
221
222    int                        interestedCount;
223    int                        maxPeers;
224    time_t                     lastCancel;
225
226    /* Before the endgame this should be 0. In endgame, is contains the average
227     * number of pending requests per peer. Only peers which have more pending
228     * requests are considered 'fast' are allowed to request a block that's
229     * already been requested from another (slower?) peer. */
230    int                        endgame;
231}
232Torrent;
233
234struct tr_peerMgr
235{
236    tr_session    * session;
237    tr_ptrArray     incomingHandshakes; /* tr_handshake */
238    struct event  * bandwidthTimer;
239    struct event  * rechokeTimer;
240    struct event  * refillUpkeepTimer;
241    struct event  * atomTimer;
242};
243
244#define tordbg( t, ... ) \
245    do { \
246        if( tr_deepLoggingIsActive( ) ) \
247            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
248    } while( 0 )
249
250#define dbgmsg( ... ) \
251    do { \
252        if( tr_deepLoggingIsActive( ) ) \
253            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
254    } while( 0 )
255
256/**
257***
258**/
259
260static inline void
261managerLock( const struct tr_peerMgr * manager )
262{
263    tr_sessionLock( manager->session );
264}
265
266static inline void
267managerUnlock( const struct tr_peerMgr * manager )
268{
269    tr_sessionUnlock( manager->session );
270}
271
272static inline void
273torrentLock( Torrent * torrent )
274{
275    managerLock( torrent->manager );
276}
277
278static inline void
279torrentUnlock( Torrent * torrent )
280{
281    managerUnlock( torrent->manager );
282}
283
284static inline int
285torrentIsLocked( const Torrent * t )
286{
287    return tr_sessionIsLocked( t->manager->session );
288}
289
290/**
291***
292**/
293
294static int
295handshakeCompareToAddr( const void * va, const void * vb )
296{
297    const tr_handshake * a = va;
298
299    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
300}
301
302static int
303handshakeCompare( const void * a, const void * b )
304{
305    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
306}
307
308static inline tr_handshake*
309getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
310{
311    if( tr_ptrArrayEmpty( handshakes ) )
312        return NULL;
313
314    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
315}
316
317static int
318comparePeerAtomToAddress( const void * va, const void * vb )
319{
320    const struct peer_atom * a = va;
321
322    return tr_compareAddresses( &a->addr, vb );
323}
324
325static int
326compareAtomsByAddress( const void * va, const void * vb )
327{
328    const struct peer_atom * b = vb;
329
330    assert( tr_isAtom( b ) );
331
332    return comparePeerAtomToAddress( va, &b->addr );
333}
334
335/**
336***
337**/
338
339const tr_address *
340tr_peerAddress( const tr_peer * peer )
341{
342    return &peer->atom->addr;
343}
344
345static Torrent*
346getExistingTorrent( tr_peerMgr *    manager,
347                    const uint8_t * hash )
348{
349    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
350
351    return tor == NULL ? NULL : tor->torrentPeers;
352}
353
354static int
355peerCompare( const void * a, const void * b )
356{
357    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
358}
359
360static struct peer_atom*
361getExistingAtom( const Torrent    * t,
362                 const tr_address * addr )
363{
364    Torrent * tt = (Torrent*)t;
365    assert( torrentIsLocked( t ) );
366    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
367}
368
369static tr_bool
370peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
371{
372    Torrent * t = (Torrent*) ct;
373
374    assert( torrentIsLocked ( t ) );
375
376    return ( atom->peer != NULL )
377        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
378        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
379}
380
381static tr_peer*
382peerConstructor( struct peer_atom * atom )
383{
384    tr_peer * peer = tr_new0( tr_peer, 1 );
385
386    tr_bitsetConstructor( &peer->have, 0 );
387
388    peer->atom = atom;
389    atom->peer = peer;
390
391    peer->blocksSentToClient  = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
392    peer->blocksSentToPeer    = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
393    peer->cancelsSentToClient = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
394    peer->cancelsSentToPeer   = tr_historyNew( CANCEL_HISTORY_SEC, ( REFILL_UPKEEP_PERIOD_MSEC / 1000 ) );
395
396    return peer;
397}
398
399static tr_peer*
400getPeer( Torrent * torrent, struct peer_atom * atom )
401{
402    tr_peer * peer;
403
404    assert( torrentIsLocked( torrent ) );
405
406    peer = atom->peer;
407
408    if( peer == NULL )
409    {
410        peer = peerConstructor( atom );
411        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
412    }
413
414    return peer;
415}
416
417static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
418
419static void
420peerDestructor( Torrent * t, tr_peer * peer )
421{
422    assert( peer != NULL );
423
424    peerDeclinedAllRequests( t, peer );
425
426    if( peer->msgs != NULL )
427        tr_peerMsgsFree( peer->msgs );
428
429    tr_peerIoClear( peer->io );
430    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
431
432    tr_historyFree( peer->blocksSentToClient  );
433    tr_historyFree( peer->blocksSentToPeer    );
434    tr_historyFree( peer->cancelsSentToClient );
435    tr_historyFree( peer->cancelsSentToPeer   );
436
437    tr_bitsetDestructor( &peer->have );
438    tr_bitfieldFree( peer->blame );
439    tr_free( peer->client );
440    peer->atom->peer = NULL;
441
442    tr_free( peer );
443}
444
445static tr_bool
446replicationExists( const Torrent * t )
447{
448    return t->pieceReplication != NULL;
449}
450
451static void
452replicationFree( Torrent * t )
453{
454    tr_free( t->pieceReplication );
455    t->pieceReplication = NULL;
456    t->pieceReplicationSize = 0;
457}
458
459static void
460replicationNew( Torrent * t )
461{
462    tr_piece_index_t piece_i;
463    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
464    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
465    const int peer_count = tr_ptrArraySize( &t->peers );
466
467    assert( !replicationExists( t ) );
468
469    t->pieceReplicationSize = piece_count;
470    t->pieceReplication = tr_new0( uint16_t, piece_count );
471
472    for( piece_i=0; piece_i<piece_count; ++piece_i )
473    {
474        int peer_i;
475        uint16_t r = 0;
476
477        for( peer_i=0; peer_i<peer_count; ++peer_i )
478            if( tr_bitsetHasFast( &peers[peer_i]->have, piece_i ) )
479                ++r;
480
481        t->pieceReplication[piece_i] = r;
482    }
483}
484
485static void
486torrentDestructor( void * vt )
487{
488    Torrent * t = vt;
489
490    assert( t );
491    assert( !t->isRunning );
492    assert( torrentIsLocked( t ) );
493    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
494    assert( tr_ptrArrayEmpty( &t->peers ) );
495
496    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
497    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
498    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
499    tr_ptrArrayDestruct( &t->peers, NULL );
500
501    replicationFree( t );
502
503    tr_free( t->requests );
504    tr_free( t->pieces );
505    tr_free( t );
506}
507
508static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
509
510static Torrent*
511torrentConstructor( tr_peerMgr * manager,
512                    tr_torrent * tor )
513{
514    int       i;
515    Torrent * t;
516
517    t = tr_new0( Torrent, 1 );
518    t->manager = manager;
519    t->tor = tor;
520    t->pool = TR_PTR_ARRAY_INIT;
521    t->peers = TR_PTR_ARRAY_INIT;
522    t->webseeds = TR_PTR_ARRAY_INIT;
523    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
524
525    for( i = 0; i < tor->info.webseedCount; ++i )
526    {
527        tr_webseed * w =
528            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
529        tr_ptrArrayAppend( &t->webseeds, w );
530    }
531
532    return t;
533}
534
535tr_peerMgr*
536tr_peerMgrNew( tr_session * session )
537{
538    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
539    m->session = session;
540    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
541    return m;
542}
543
544static void
545deleteTimer( struct event ** t )
546{
547    if( *t != NULL )
548    {
549        event_free( *t );
550        *t = NULL;
551    }
552}
553
554static void
555deleteTimers( struct tr_peerMgr * m )
556{
557    deleteTimer( &m->atomTimer );
558    deleteTimer( &m->bandwidthTimer );
559    deleteTimer( &m->rechokeTimer );
560    deleteTimer( &m->refillUpkeepTimer );
561}
562
563void
564tr_peerMgrFree( tr_peerMgr * manager )
565{
566    managerLock( manager );
567
568    deleteTimers( manager );
569
570    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
571     * the item from manager->handshakes, so this is a little roundabout... */
572    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
573        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
574
575    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
576
577    managerUnlock( manager );
578    tr_free( manager );
579}
580
581static int
582clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
583{
584    if( !tr_torrentHasMetadata( tor ) )
585        return TRUE;
586
587    return peer->clientIsInterested && !peer->clientIsChoked;
588}
589
590static int
591clientIsUploadingTo( const tr_peer * peer )
592{
593    return peer->peerIsInterested && !peer->peerIsChoked;
594}
595
596/***
597****
598***/
599
600void
601tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
602{
603    tr_torrent * tor = NULL;
604    tr_session * session = mgr->session;
605
606    /* we cache whether or not a peer is blocklisted...
607       since the blocklist has changed, erase that cached value */
608    while(( tor = tr_torrentNext( session, tor )))
609    {
610        int i;
611        Torrent * t = tor->torrentPeers;
612        const int n = tr_ptrArraySize( &t->pool );
613        for( i=0; i<n; ++i ) {
614            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
615            atom->blocklisted = -1;
616        }
617    }
618}
619
620static tr_bool
621isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
622{
623    if( atom->blocklisted < 0 )
624        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
625
626    assert( tr_isBool( atom->blocklisted ) );
627    return atom->blocklisted;
628}
629
630
631/***
632****
633***/
634
635static void
636atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
637{
638    assert( atom != NULL );
639    assert( -1<=seedProbability && seedProbability<=100 );
640
641    atom->seedProbability = seedProbability;
642
643    if( seedProbability == 100 )
644        atom->flags |= ADDED_F_SEED_FLAG;
645    else if( seedProbability != -1 )
646        atom->flags &= ~ADDED_F_SEED_FLAG;
647}
648
649static void
650atomSetSeed( struct peer_atom * atom )
651{
652    atomSetSeedProbability( atom, 100 );
653}
654
655static inline tr_bool
656atomIsSeed( const struct peer_atom * atom )
657{
658    return atom->seedProbability == 100;
659}
660
661tr_bool
662tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
663                      const tr_address  * addr )
664{
665    tr_bool isSeed = FALSE;
666    const Torrent * t = tor->torrentPeers;
667    const struct peer_atom * atom = getExistingAtom( t, addr );
668
669    if( atom )
670        isSeed = atomIsSeed( atom );
671
672    return isSeed;
673}
674
675void
676tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
677{
678    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
679
680    if( atom )
681        atom->flags |= ADDED_F_UTP_FLAGS;
682}
683
684void
685tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, tr_bool failed )
686{
687    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
688
689    if( atom )
690        atom->utp_failed = failed;
691}
692
693
694/**
695***  REQUESTS
696***
697*** There are two data structures associated with managing block requests:
698***
699*** 1. Torrent::requests, an array of "struct block_request" which keeps
700***    track of which blocks have been requested, and when, and by which peers.
701***    This is list is used for (a) cancelling requests that have been pending
702***    for too long and (b) avoiding duplicate requests before endgame.
703***
704*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
705***    pieces that we want to request. It's used to decide which blocks to
706***    return next when tr_peerMgrGetBlockRequests() is called.
707**/
708
709/**
710*** struct block_request
711**/
712
713static int
714compareReqByBlock( const void * va, const void * vb )
715{
716    const struct block_request * a = va;
717    const struct block_request * b = vb;
718
719    /* primary key: block */
720    if( a->block < b->block ) return -1;
721    if( a->block > b->block ) return 1;
722
723    /* secondary key: peer */
724    if( a->peer < b->peer ) return -1;
725    if( a->peer > b->peer ) return 1;
726
727    return 0;
728}
729
730static void
731requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
732{
733    struct block_request key;
734
735    /* ensure enough room is available... */
736    if( t->requestCount + 1 >= t->requestAlloc )
737    {
738        const int CHUNK_SIZE = 128;
739        t->requestAlloc += CHUNK_SIZE;
740        t->requests = tr_renew( struct block_request,
741                                t->requests, t->requestAlloc );
742    }
743
744    /* populate the record we're inserting */
745    key.block = block;
746    key.peer = peer;
747    key.sentAt = tr_time( );
748
749    /* insert the request to our array... */
750    {
751        tr_bool exact;
752        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
753                                       sizeof( struct block_request ),
754                                       compareReqByBlock, &exact );
755        assert( !exact );
756        memmove( t->requests + pos + 1,
757                 t->requests + pos,
758                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
759        t->requests[pos] = key;
760    }
761
762    if( peer != NULL )
763    {
764        ++peer->pendingReqsToPeer;
765        assert( peer->pendingReqsToPeer >= 0 );
766    }
767
768    /*fprintf( stderr, "added request of block %lu from peer %s... "
769                       "there are now %d block\n",
770                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
771}
772
773static struct block_request *
774requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
775{
776    struct block_request key;
777    key.block = block;
778    key.peer = (tr_peer*) peer;
779
780    return bsearch( &key, t->requests, t->requestCount,
781                    sizeof( struct block_request ),
782                    compareReqByBlock );
783}
784
785/**
786 * Find the peers are we currently requesting the block
787 * with index @a block from and append them to @a peerArr.
788 */
789static void
790getBlockRequestPeers( Torrent * t, tr_block_index_t block,
791                      tr_ptrArray * peerArr )
792{
793    tr_bool exact;
794    int i, pos;
795    struct block_request key;
796
797    key.block = block;
798    key.peer = NULL;
799    pos = tr_lowerBound( &key, t->requests, t->requestCount,
800                         sizeof( struct block_request ),
801                         compareReqByBlock, &exact );
802
803    assert( !exact ); /* shouldn't have a request with .peer == NULL */
804
805    for( i = pos; i < t->requestCount; ++i )
806    {
807        if( t->requests[i].block != block )
808            break;
809        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
810    }
811}
812
813static void
814decrementPendingReqCount( const struct block_request * b )
815{
816    if( b->peer != NULL )
817        if( b->peer->pendingReqsToPeer > 0 )
818            --b->peer->pendingReqsToPeer;
819}
820
821static void
822requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
823{
824    const struct block_request * b = requestListLookup( t, block, peer );
825    if( b != NULL )
826    {
827        const int pos = b - t->requests;
828        assert( pos < t->requestCount );
829
830        decrementPendingReqCount( b );
831
832        tr_removeElementFromArray( t->requests,
833                                   pos,
834                                   sizeof( struct block_request ),
835                                   t->requestCount-- );
836
837        /*fprintf( stderr, "removing request of block %lu from peer %s... "
838                           "there are now %d block requests left\n",
839                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
840    }
841}
842
843static int
844countActiveWebseeds( const Torrent * t )
845{
846    int activeCount = 0;
847    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
848    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
849
850    for( ; w!=wend; ++w )
851        if( tr_webseedIsActive( *w ) )
852            ++activeCount;
853
854    return activeCount;
855}
856
857static void
858updateEndgame( Torrent * t )
859{
860    const tr_torrent * tor = t->tor;
861    const tr_block_index_t missing = tr_cpBlocksMissing( &tor->completion );
862
863    assert( t->requestCount >= 0 );
864
865    if( (tr_block_index_t) t->requestCount < missing )
866    {
867        /* not in endgame */
868        t->endgame = 0;
869    }
870    else if( !t->endgame ) /* only recalculate when endgame first begins */
871    {
872        int numDownloading = 0;
873        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
874        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
875
876        /* add the active bittorrent peers... */
877        for( ; p!=pend; ++p )
878            if( (*p)->pendingReqsToPeer > 0 )
879                ++numDownloading;
880
881        /* add the active webseeds... */
882        numDownloading += countActiveWebseeds( t );
883
884        /* average number of pending requests per downloading peer */
885        t->endgame = t->requestCount / MAX( numDownloading, 1 );
886    }
887}
888
889
890/****
891*****
892*****  Piece List Manipulation / Accessors
893*****
894****/
895
896static inline void
897invalidatePieceSorting( Torrent * t )
898{
899    t->pieceSortState = PIECES_UNSORTED;
900}
901
902const tr_torrent * weightTorrent;
903
904const uint16_t * weightReplication;
905
906static void
907setComparePieceByWeightTorrent( Torrent * t )
908{
909    if( !replicationExists( t ) )
910        replicationNew( t );
911
912    weightTorrent = t->tor;
913    weightReplication = t->pieceReplication;
914}
915
916/* we try to create a "weight" s.t. high-priority pieces come before others,
917 * and that partially-complete pieces come before empty ones. */
918static int
919comparePieceByWeight( const void * va, const void * vb )
920{
921    const struct weighted_piece * a = va;
922    const struct weighted_piece * b = vb;
923    int ia, ib, missing, pending;
924    const tr_torrent * tor = weightTorrent;
925    const uint16_t * rep = weightReplication;
926
927    /* primary key: weight */
928    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
929    pending = a->requestCount;
930    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
931    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
932    pending = b->requestCount;
933    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
934    if( ia < ib ) return -1;
935    if( ia > ib ) return 1;
936
937    /* secondary key: higher priorities go first */
938    ia = tor->info.pieces[a->index].priority;
939    ib = tor->info.pieces[b->index].priority;
940    if( ia > ib ) return -1;
941    if( ia < ib ) return 1;
942
943    /* tertiary key: rarest first. */
944    ia = rep[a->index];
945    ib = rep[b->index];
946    if( ia < ib ) return -1;
947    if( ia > ib ) return 1;
948
949    /* quaternary key: random */
950    if( a->salt < b->salt ) return -1;
951    if( a->salt > b->salt ) return 1;
952
953    /* okay, they're equal */
954    return 0;
955}
956
957static int
958comparePieceByIndex( const void * va, const void * vb )
959{
960    const struct weighted_piece * a = va;
961    const struct weighted_piece * b = vb;
962    if( a->index < b->index ) return -1;
963    if( a->index > b->index ) return 1;
964    return 0;
965}
966
967static void
968pieceListSort( Torrent * t, enum piece_sort_state state )
969{
970    assert( state==PIECES_SORTED_BY_INDEX
971         || state==PIECES_SORTED_BY_WEIGHT );
972
973
974    if( state == PIECES_SORTED_BY_WEIGHT )
975    {
976        setComparePieceByWeightTorrent( t );
977        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
978    }
979    else
980        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
981
982    t->pieceSortState = state;
983}
984
985/**
986 * These functions are useful for testing, but too expensive for nightly builds.
987 * let's leave it disabled but add an easy hook to compile it back in
988 */
989#if 1
990#define assertWeightedPiecesAreSorted(t)
991#define assertReplicationCountIsExact(t)
992#else
993static void
994assertWeightedPiecesAreSorted( Torrent * t )
995{
996    if( !t->endgame )
997    {
998        int i;
999        setComparePieceByWeightTorrent( t );
1000        for( i=0; i<t->pieceCount-1; ++i )
1001            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1002    }
1003}
1004static void
1005assertReplicationCountIsExact( Torrent * t )
1006{
1007    /* This assert might fail due to errors of implementations in other
1008     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1009     * from a client. If a such a behavior is noticed,
1010     * a bug report should be filled to the faulty client. */
1011
1012    size_t piece_i;
1013    const uint16_t * rep = t->pieceReplication;
1014    const size_t piece_count = t->pieceReplicationSize;
1015    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1016    const int peer_count = tr_ptrArraySize( &t->peers );
1017
1018    assert( piece_count == t->tor->info.pieceCount );
1019
1020    for( piece_i=0; piece_i<piece_count; ++piece_i )
1021    {
1022        int peer_i;
1023        uint16_t r = 0;
1024
1025        for( peer_i=0; peer_i<peer_count; ++peer_i )
1026            if( tr_bitsetHasFast( &peers[peer_i]->have, piece_i ) )
1027                ++r;
1028
1029        assert( rep[piece_i] == r );
1030    }
1031}
1032#endif
1033
1034static struct weighted_piece *
1035pieceListLookup( Torrent * t, tr_piece_index_t index )
1036{
1037    int i;
1038
1039    for( i=0; i<t->pieceCount; ++i )
1040        if( t->pieces[i].index == index )
1041            return &t->pieces[i];
1042
1043    return NULL;
1044}
1045
1046static void
1047pieceListRebuild( Torrent * t )
1048{
1049
1050    if( !tr_torrentIsSeed( t->tor ) )
1051    {
1052        tr_piece_index_t i;
1053        tr_piece_index_t * pool;
1054        tr_piece_index_t poolCount = 0;
1055        const tr_torrent * tor = t->tor;
1056        const tr_info * inf = tr_torrentInfo( tor );
1057        struct weighted_piece * pieces;
1058        int pieceCount;
1059
1060        /* build the new list */
1061        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1062        for( i=0; i<inf->pieceCount; ++i )
1063            if( !inf->pieces[i].dnd )
1064                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1065                    pool[poolCount++] = i;
1066        pieceCount = poolCount;
1067        pieces = tr_new0( struct weighted_piece, pieceCount );
1068        for( i=0; i<poolCount; ++i ) {
1069            struct weighted_piece * piece = pieces + i;
1070            piece->index = pool[i];
1071            piece->requestCount = 0;
1072            piece->salt = tr_cryptoWeakRandInt( 4096 );
1073        }
1074
1075        /* if we already had a list of pieces, merge it into
1076         * the new list so we don't lose its requestCounts */
1077        if( t->pieces != NULL )
1078        {
1079            struct weighted_piece * o = t->pieces;
1080            struct weighted_piece * oend = o + t->pieceCount;
1081            struct weighted_piece * n = pieces;
1082            struct weighted_piece * nend = n + pieceCount;
1083
1084            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1085
1086            while( o!=oend && n!=nend ) {
1087                if( o->index < n->index )
1088                    ++o;
1089                else if( o->index > n->index )
1090                    ++n;
1091                else
1092                    *n++ = *o++;
1093            }
1094
1095            tr_free( t->pieces );
1096        }
1097
1098        t->pieces = pieces;
1099        t->pieceCount = pieceCount;
1100
1101        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1102
1103        /* cleanup */
1104        tr_free( pool );
1105    }
1106}
1107
1108static void
1109pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1110{
1111    struct weighted_piece * p;
1112
1113    if(( p = pieceListLookup( t, piece )))
1114    {
1115        const int pos = p - t->pieces;
1116
1117        tr_removeElementFromArray( t->pieces,
1118                                   pos,
1119                                   sizeof( struct weighted_piece ),
1120                                   t->pieceCount-- );
1121
1122        if( t->pieceCount == 0 )
1123        {
1124            tr_free( t->pieces );
1125            t->pieces = NULL;
1126        }
1127    }
1128}
1129
1130static void
1131pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1132{
1133    int pos;
1134    tr_bool isSorted = TRUE;
1135
1136    if( p == NULL )
1137        return;
1138
1139    /* is the torrent already sorted? */
1140    pos = p - t->pieces;
1141    setComparePieceByWeightTorrent( t );
1142    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1143        isSorted = FALSE;
1144    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1145        isSorted = FALSE;
1146
1147    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1148    {
1149       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1150       isSorted = TRUE;
1151    }
1152
1153    /* if it's not sorted, move it around */
1154    if( !isSorted )
1155    {
1156        tr_bool exact;
1157        const struct weighted_piece tmp = *p;
1158
1159        tr_removeElementFromArray( t->pieces,
1160                                   pos,
1161                                   sizeof( struct weighted_piece ),
1162                                   t->pieceCount-- );
1163
1164        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1165                             sizeof( struct weighted_piece ),
1166                             comparePieceByWeight, &exact );
1167
1168        memmove( &t->pieces[pos + 1],
1169                 &t->pieces[pos],
1170                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1171
1172        t->pieces[pos] = tmp;
1173    }
1174
1175    assertWeightedPiecesAreSorted( t );
1176}
1177
1178static void
1179pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1180{
1181    struct weighted_piece * p;
1182    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1183
1184    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1185    {
1186        --p->requestCount;
1187        pieceListResortPiece( t, p );
1188    }
1189}
1190
1191
1192/****
1193*****
1194*****  Replication count ( for rarest first policy )
1195*****
1196****/
1197
1198/**
1199 * Increase the replication count of this piece and sort it if the
1200 * piece list is already sorted
1201 */
1202static void
1203tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1204{
1205    assert( replicationExists( t ) );
1206    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1207
1208    /* One more replication of this piece is present in the swarm */
1209    ++t->pieceReplication[index];
1210
1211    /* we only resort the piece if the list is already sorted */
1212    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1213        pieceListResortPiece( t, pieceListLookup( t, index ) );
1214}
1215
1216/**
1217 * Increases the replication count of pieces present in the bitfield
1218 */
1219static void
1220tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1221{
1222    size_t i;
1223    uint16_t * rep = t->pieceReplication;
1224    const size_t n = t->tor->info.pieceCount;
1225
1226    assert( replicationExists( t ) );
1227    assert( n == t->pieceReplicationSize );
1228    assert( tr_bitfieldTestFast( b, n-1 ) );
1229
1230    for( i=0; i<n; ++i )
1231        if( tr_bitfieldHas( b, i ) )
1232            ++rep[i];
1233
1234    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1235        invalidatePieceSorting( t );
1236}
1237
1238/**
1239 * Increase the replication count of every piece
1240 */
1241static void
1242tr_incrReplication( Torrent * t )
1243{
1244    int i;
1245    const int n = t->pieceReplicationSize;
1246
1247    assert( replicationExists( t ) );
1248    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1249
1250    for( i=0; i<n; ++i )
1251        ++t->pieceReplication[i];
1252}
1253
1254/**
1255 * Decrease the replication count of pieces present in the bitset.
1256 */
1257static void
1258tr_decrReplicationFromBitset( Torrent * t, const tr_bitset * bitset )
1259{
1260    int i;
1261    const int n = t->pieceReplicationSize;
1262
1263    assert( replicationExists( t ) );
1264    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1265
1266    if( bitset->haveAll )
1267    {
1268        for( i=0; i<n; ++i )
1269            --t->pieceReplication[i];
1270    }
1271    else if ( !bitset->haveNone )
1272    {
1273        const tr_bitfield * const b = &bitset->bitfield;
1274
1275        for( i=0; i<n; ++i )
1276            if( tr_bitfieldHas( b, i ) )
1277                --t->pieceReplication[i];
1278
1279        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1280            invalidatePieceSorting( t );
1281    }
1282}
1283
1284/**
1285***
1286**/
1287
1288void
1289tr_peerMgrRebuildRequests( tr_torrent * tor )
1290{
1291    assert( tr_isTorrent( tor ) );
1292
1293    pieceListRebuild( tor->torrentPeers );
1294}
1295
1296void
1297tr_peerMgrGetNextRequests( tr_torrent           * tor,
1298                           tr_peer              * peer,
1299                           int                    numwant,
1300                           tr_block_index_t     * setme,
1301                           int                  * numgot )
1302{
1303    int i;
1304    int got;
1305    Torrent * t;
1306    struct weighted_piece * pieces;
1307    const tr_bitset * have = &peer->have;
1308
1309    /* sanity clause */
1310    assert( tr_isTorrent( tor ) );
1311    assert( peer->clientIsInterested );
1312    assert( !peer->clientIsChoked );
1313    assert( numwant > 0 );
1314
1315    /* walk through the pieces and find blocks that should be requested */
1316    got = 0;
1317    t = tor->torrentPeers;
1318
1319    /* prep the pieces list */
1320    if( t->pieces == NULL )
1321        pieceListRebuild( t );
1322
1323    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1324        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1325
1326    assertReplicationCountIsExact( t );
1327    assertWeightedPiecesAreSorted( t );
1328
1329    updateEndgame( t );
1330    pieces = t->pieces;
1331    for( i=0; i<t->pieceCount && got<numwant; ++i )
1332    {
1333        struct weighted_piece * p = pieces + i;
1334
1335        /* if the peer has this piece that we want... */
1336        if( tr_bitsetHasFast( have, p->index ) )
1337        {
1338            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1339            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1340            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1341
1342            for( ; b!=e && got<numwant; ++b )
1343            {
1344                int peerCount;
1345                tr_peer ** peers;
1346
1347                /* don't request blocks we've already got */
1348                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1349                    continue;
1350
1351                /* always add peer if this block has no peers yet */
1352                tr_ptrArrayClear( &peerArr );
1353                getBlockRequestPeers( t, b, &peerArr );
1354                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1355                if( peerCount != 0 )
1356                {
1357                    /* don't make a second block request until the endgame */
1358                    if( !t->endgame )
1359                        continue;
1360
1361                    /* don't have more than two peers requesting this block */
1362                    if( peerCount > 1 )
1363                        continue;
1364
1365                    /* don't send the same request to the same peer twice */
1366                    if( peer == peers[0] )
1367                        continue;
1368
1369                    /* in the endgame allow an additional peer to download a
1370                       block but only if the peer seems to be handling requests
1371                       relatively fast */
1372                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1373                        continue;
1374                }
1375
1376                /* update the caller's table */
1377                setme[got++] = b;
1378
1379                /* update our own tables */
1380                requestListAdd( t, b, peer );
1381                ++p->requestCount;
1382            }
1383
1384            tr_ptrArrayDestruct( &peerArr, NULL );
1385        }
1386    }
1387
1388    /* In most cases we've just changed the weights of a small number of pieces.
1389     * So rather than qsort()ing the entire array, it's faster to apply an
1390     * adaptive insertion sort algorithm. */
1391    if( got > 0 )
1392    {
1393        /* not enough requests || last piece modified */
1394        if ( i == t->pieceCount ) --i;
1395
1396        setComparePieceByWeightTorrent( t );
1397        while( --i >= 0 )
1398        {
1399            tr_bool exact;
1400
1401            /* relative position! */
1402            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1403                                              t->pieceCount - (i + 1),
1404                                              sizeof( struct weighted_piece ),
1405                                              comparePieceByWeight, &exact );
1406            if( newpos > 0 )
1407            {
1408                const struct weighted_piece piece = t->pieces[i];
1409                memmove( &t->pieces[i],
1410                         &t->pieces[i + 1],
1411                         sizeof( struct weighted_piece ) * ( newpos ) );
1412                t->pieces[i + newpos] = piece;
1413            }
1414        }
1415    }
1416
1417    assertWeightedPiecesAreSorted( t );
1418    *numgot = got;
1419}
1420
1421tr_bool
1422tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1423                          const tr_peer     * peer,
1424                          tr_block_index_t    block )
1425{
1426    const Torrent * t = tor->torrentPeers;
1427    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1428}
1429
1430/* cancel requests that are too old */
1431static void
1432refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1433{
1434    time_t now;
1435    time_t too_old;
1436    tr_torrent * tor;
1437    tr_peerMgr * mgr = vmgr;
1438    managerLock( mgr );
1439
1440    now = tr_time( );
1441    too_old = now - REQUEST_TTL_SECS;
1442
1443    tor = NULL;
1444    while(( tor = tr_torrentNext( mgr->session, tor )))
1445    {
1446        Torrent * t = tor->torrentPeers;
1447        const int n = t->requestCount;
1448        if( n > 0 )
1449        {
1450            int keepCount = 0;
1451            int cancelCount = 0;
1452            struct block_request * cancel = tr_new( struct block_request, n );
1453            const struct block_request * it;
1454            const struct block_request * end;
1455
1456            for( it=t->requests, end=it+n; it!=end; ++it )
1457            {
1458                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1459                    cancel[cancelCount++] = *it;
1460                else
1461                {
1462                    if( it != &t->requests[keepCount] )
1463                        t->requests[keepCount] = *it;
1464                    keepCount++;
1465                }
1466            }
1467
1468            /* prune out the ones we aren't keeping */
1469            t->requestCount = keepCount;
1470
1471            /* send cancel messages for all the "cancel" ones */
1472            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1473                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1474                    tr_historyAdd( it->peer->cancelsSentToPeer, now, 1 );
1475                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1476                    decrementPendingReqCount( it );
1477                }
1478            }
1479
1480            /* decrement the pending request counts for the timed-out blocks */
1481            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1482                pieceListRemoveRequest( t, it->block );
1483
1484            /* cleanup loop */
1485            tr_free( cancel );
1486        }
1487    }
1488
1489    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1490    managerUnlock( mgr );
1491}
1492
1493static void
1494addStrike( Torrent * t, tr_peer * peer )
1495{
1496    tordbg( t, "increasing peer %s strike count to %d",
1497            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1498
1499    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1500    {
1501        struct peer_atom * atom = peer->atom;
1502        atom->flags2 |= MYFLAG_BANNED;
1503        peer->doPurge = 1;
1504        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1505    }
1506}
1507
1508static void
1509gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1510{
1511    tr_torrent *   tor = t->tor;
1512    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1513
1514    tor->corruptCur += byteCount;
1515    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1516
1517    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1518}
1519
1520static void
1521peerSuggestedPiece( Torrent            * t UNUSED,
1522                    tr_peer            * peer UNUSED,
1523                    tr_piece_index_t     pieceIndex UNUSED,
1524                    int                  isFastAllowed UNUSED )
1525{
1526#if 0
1527    assert( t );
1528    assert( peer );
1529    assert( peer->msgs );
1530
1531    /* is this a valid piece? */
1532    if(  pieceIndex >= t->tor->info.pieceCount )
1533        return;
1534
1535    /* don't ask for it if we've already got it */
1536    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1537        return;
1538
1539    /* don't ask for it if they don't have it */
1540    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1541        return;
1542
1543    /* don't ask for it if we're choked and it's not fast */
1544    if( !isFastAllowed && peer->clientIsChoked )
1545        return;
1546
1547    /* request the blocks that we don't have in this piece */
1548    {
1549        tr_block_index_t block;
1550        const tr_torrent * tor = t->tor;
1551        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1552        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1553
1554        for( block=start; block<end; ++block )
1555        {
1556            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1557            {
1558                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1559                const uint32_t length = tr_torBlockCountBytes( tor, block );
1560                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1561                incrementPieceRequests( t, pieceIndex );
1562            }
1563        }
1564    }
1565#endif
1566}
1567
1568static void
1569removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1570{
1571    requestListRemove( t, block, peer );
1572    pieceListRemoveRequest( t, block );
1573}
1574
1575/* peer choked us, or maybe it disconnected.
1576   either way we need to remove all its requests */
1577static void
1578peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1579{
1580    int i, n;
1581    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1582
1583    for( i=n=0; i<t->requestCount; ++i )
1584        if( peer == t->requests[i].peer )
1585            blocks[n++] = t->requests[i].block;
1586
1587    for( i=0; i<n; ++i )
1588        removeRequestFromTables( t, blocks[i], peer );
1589
1590    tr_free( blocks );
1591}
1592
1593static void
1594peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1595{
1596    Torrent * t = vt;
1597
1598    torrentLock( t );
1599
1600    switch( e->eventType )
1601    {
1602        case TR_PEER_PEER_GOT_DATA:
1603        {
1604            const time_t now = tr_time( );
1605            tr_torrent * tor = t->tor;
1606
1607            if( e->wasPieceData )
1608            {
1609                tor->uploadedCur += e->length;
1610                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1611                tr_torrentSetActivityDate( tor, now );
1612                tr_torrentSetDirty( tor );
1613            }
1614
1615            /* update the stats */
1616            if( e->wasPieceData )
1617                tr_statsAddUploaded( tor->session, e->length );
1618
1619            /* update our atom */
1620            if( peer && e->wasPieceData )
1621                peer->atom->piece_data_time = now;
1622
1623            break;
1624        }
1625
1626        case TR_PEER_CLIENT_GOT_HAVE:
1627            if( replicationExists( t ) ) {
1628                tr_incrReplicationOfPiece( t, e->pieceIndex );
1629                assertReplicationCountIsExact( t );
1630            }
1631            break;
1632
1633        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1634            if( replicationExists( t ) ) {
1635                tr_incrReplication( t );
1636                assertReplicationCountIsExact( t );
1637            }
1638            break;
1639
1640        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1641            /* noop */
1642            break;
1643
1644        case TR_PEER_CLIENT_GOT_BITFIELD:
1645            assert( e->bitfield != NULL );
1646            if( replicationExists( t ) ) {
1647                tr_incrReplicationFromBitfield( t, e->bitfield );
1648                assertReplicationCountIsExact( t );
1649            }
1650            break;
1651
1652        case TR_PEER_CLIENT_GOT_REJ:
1653            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1654            break;
1655
1656        case TR_PEER_CLIENT_GOT_CHOKE:
1657            peerDeclinedAllRequests( t, peer );
1658            break;
1659
1660        case TR_PEER_CLIENT_GOT_PORT:
1661            if( peer )
1662                peer->atom->port = e->port;
1663            break;
1664
1665        case TR_PEER_CLIENT_GOT_SUGGEST:
1666            if( peer )
1667                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1668            break;
1669
1670        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1671            if( peer )
1672                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1673            break;
1674
1675        case TR_PEER_CLIENT_GOT_DATA:
1676        {
1677            const time_t now = tr_time( );
1678            tr_torrent * tor = t->tor;
1679
1680            if( e->wasPieceData )
1681            {
1682                tor->downloadedCur += e->length;
1683                tr_torrentSetActivityDate( tor, now );
1684                tr_torrentSetDirty( tor );
1685            }
1686
1687            /* update the stats */
1688            if( e->wasPieceData )
1689                tr_statsAddDownloaded( tor->session, e->length );
1690
1691            /* update our atom */
1692            if( peer && peer->atom && e->wasPieceData )
1693                peer->atom->piece_data_time = now;
1694
1695            break;
1696        }
1697
1698        case TR_PEER_PEER_PROGRESS:
1699        {
1700            if( peer )
1701            {
1702                struct peer_atom * atom = peer->atom;
1703                if( e->progress >= 1.0 ) {
1704                    tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
1705                    atomSetSeed( atom );
1706                }
1707            }
1708            break;
1709        }
1710
1711        case TR_PEER_CLIENT_GOT_BLOCK:
1712        {
1713            tr_torrent * tor = t->tor;
1714            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1715            int i, peerCount;
1716            tr_peer ** peers;
1717            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1718
1719            removeRequestFromTables( t, block, peer );
1720            getBlockRequestPeers( t, block, &peerArr );
1721            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1722
1723            /* remove additional block requests and send cancel to peers */
1724            for( i=0; i<peerCount; i++ ) {
1725                tr_peer * p = peers[i];
1726                assert( p != peer );
1727                if( p->msgs ) {
1728                    tr_historyAdd( p->cancelsSentToPeer, tr_time( ), 1 );
1729                    tr_peerMsgsCancel( p->msgs, block );
1730                }
1731                removeRequestFromTables( t, block, p );
1732            }
1733
1734            tr_ptrArrayDestruct( &peerArr, FALSE );
1735
1736            if( peer && peer->blocksSentToClient )
1737                tr_historyAdd( peer->blocksSentToClient, tr_time( ), 1 );
1738
1739            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1740            {
1741                /* we already have this block... */
1742                const uint32_t n = tr_torBlockCountBytes( tor, block );
1743                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1744                tordbg( t, "we have this block already..." );
1745            }
1746            else
1747            {
1748                tr_cpBlockAdd( &tor->completion, block );
1749                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1750                tr_torrentSetDirty( tor );
1751
1752                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1753                {
1754                    const tr_piece_index_t p = e->pieceIndex;
1755                    const tr_bool ok = tr_torrentCheckPiece( tor, p );
1756
1757                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1758
1759                    if( !ok )
1760                    {
1761                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1762                                   (unsigned long)p );
1763                    }
1764
1765                    tr_peerMgrSetBlame( tor, p, ok );
1766
1767                    if( !ok )
1768                    {
1769                        gotBadPiece( t, p );
1770                    }
1771                    else
1772                    {
1773                        int i;
1774                        int peerCount;
1775                        tr_peer ** peers;
1776                        tr_file_index_t fileIndex;
1777
1778                        /* only add this to downloadedCur if we got it from a peer --
1779                         * webseeds shouldn't count against our ratio. As one tracker
1780                         * admin put it, "Those pieces are downloaded directly from the
1781                         * content distributor, not the peers, it is the tracker's job
1782                         * to manage the swarms, not the web server and does not fit
1783                         * into the jurisdiction of the tracker." */
1784                        if( peer->msgs != NULL ) {
1785                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1786                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1787                        }
1788
1789                        peerCount = tr_ptrArraySize( &t->peers );
1790                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1791                        for( i=0; i<peerCount; ++i )
1792                            tr_peerMsgsHave( peers[i]->msgs, p );
1793
1794                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1795                            const tr_file * file = &tor->info.files[fileIndex];
1796                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1797                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1798                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1799                                    tr_torrentFileCompleted( tor, fileIndex );
1800                                }
1801                            }
1802                        }
1803
1804                        pieceListRemovePiece( t, p );
1805                    }
1806                }
1807
1808                t->needsCompletenessCheck = TRUE;
1809            }
1810            break;
1811        }
1812
1813        case TR_PEER_ERROR:
1814            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1815            {
1816                /* some protocol error from the peer */
1817                peer->doPurge = 1;
1818                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1819                        tr_atomAddrStr( peer->atom ) );
1820            }
1821            else
1822            {
1823                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1824            }
1825            break;
1826
1827        default:
1828            assert( 0 );
1829    }
1830
1831    torrentUnlock( t );
1832}
1833
1834static int
1835getDefaultShelfLife( uint8_t from )
1836{
1837    /* in general, peers obtained from firsthand contact
1838     * are better than those from secondhand, etc etc */
1839    switch( from )
1840    {
1841        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1842        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1843        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1844        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1845        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1846        case TR_PEER_FROM_RESUME   : return 60 * 60;
1847        case TR_PEER_FROM_LPD      : return 10 * 60;
1848        default                    : return 60 * 60;
1849    }
1850}
1851
1852static void
1853ensureAtomExists( Torrent           * t,
1854                  const tr_address  * addr,
1855                  const tr_port       port,
1856                  const uint8_t       flags,
1857                  const int8_t        seedProbability,
1858                  const uint8_t       from )
1859{
1860    struct peer_atom * a;
1861
1862    assert( tr_isAddress( addr ) );
1863    assert( from < TR_PEER_FROM__MAX );
1864
1865    a = getExistingAtom( t, addr );
1866
1867    if( a == NULL )
1868    {
1869        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1870        a = tr_new0( struct peer_atom, 1 );
1871        a->addr = *addr;
1872        a->port = port;
1873        a->flags = flags;
1874        a->fromFirst = from;
1875        a->fromBest = from;
1876        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1877        a->blocklisted = -1;
1878        atomSetSeedProbability( a, seedProbability );
1879        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1880
1881        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1882    }
1883    else
1884    {
1885        if( from < a->fromBest )
1886            a->fromBest = from;
1887       
1888        if( a->seedProbability == -1 )
1889            atomSetSeedProbability( a, seedProbability );
1890
1891        a->flags |= flags;
1892    }
1893}
1894
1895static int
1896getMaxPeerCount( const tr_torrent * tor )
1897{
1898    return tor->maxConnectedPeers;
1899}
1900
1901static int
1902getPeerCount( const Torrent * t )
1903{
1904    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1905}
1906
1907/* FIXME: this is kind of a mess. */
1908static tr_bool
1909myHandshakeDoneCB( tr_handshake  * handshake,
1910                   tr_peerIo     * io,
1911                   tr_bool         readAnythingFromPeer,
1912                   tr_bool         isConnected,
1913                   const uint8_t * peer_id,
1914                   void          * vmanager )
1915{
1916    tr_bool            ok = isConnected;
1917    tr_bool            success = FALSE;
1918    tr_port            port;
1919    const tr_address * addr;
1920    tr_peerMgr       * manager = vmanager;
1921    Torrent          * t;
1922    tr_handshake     * ours;
1923
1924    assert( io );
1925    assert( tr_isBool( ok ) );
1926
1927    t = tr_peerIoHasTorrentHash( io )
1928        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1929        : NULL;
1930
1931    if( tr_peerIoIsIncoming ( io ) )
1932        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1933                                        handshake, handshakeCompare );
1934    else if( t )
1935        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1936                                        handshake, handshakeCompare );
1937    else
1938        ours = handshake;
1939
1940    assert( ours );
1941    assert( ours == handshake );
1942
1943    if( t )
1944        torrentLock( t );
1945
1946    addr = tr_peerIoGetAddress( io, &port );
1947
1948    if( !ok || !t || !t->isRunning )
1949    {
1950        if( t )
1951        {
1952            struct peer_atom * atom = getExistingAtom( t, addr );
1953            if( atom )
1954            {
1955                ++atom->numFails;
1956
1957                if( !readAnythingFromPeer )
1958                {
1959                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1960                    atom->flags2 |= MYFLAG_UNREACHABLE;
1961                }
1962            }
1963        }
1964    }
1965    else /* looking good */
1966    {
1967        struct peer_atom * atom;
1968
1969        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1970        atom = getExistingAtom( t, addr );
1971        atom->time = tr_time( );
1972        atom->piece_data_time = 0;
1973        atom->lastConnectionAt = tr_time( );
1974
1975        if( !tr_peerIoIsIncoming( io ) )
1976        {
1977            atom->flags |= ADDED_F_CONNECTABLE;
1978            atom->flags2 &= ~MYFLAG_UNREACHABLE;
1979        }
1980
1981        /* In principle, this flag specifies whether the peer groks uTP,
1982           not whether it's currently connected over uTP. */
1983        if( io->utp_socket )
1984            atom->flags |= ADDED_F_UTP_FLAGS;
1985
1986        if( atom->flags2 & MYFLAG_BANNED )
1987        {
1988            tordbg( t, "banned peer %s tried to reconnect",
1989                    tr_atomAddrStr( atom ) );
1990        }
1991        else if( tr_peerIoIsIncoming( io )
1992               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1993
1994        {
1995        }
1996        else
1997        {
1998            tr_peer * peer = atom->peer;
1999
2000            if( peer ) /* we already have this peer */
2001            {
2002            }
2003            else
2004            {
2005                peer = getPeer( t, atom );
2006                tr_free( peer->client );
2007
2008                if( !peer_id )
2009                    peer->client = NULL;
2010                else {
2011                    char client[128];
2012                    tr_clientForId( client, sizeof( client ), peer_id );
2013                    peer->client = tr_strdup( client );
2014                }
2015
2016                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2017                                                                balanced by our unref in peerDestructor()  */
2018                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
2019                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2020
2021                success = TRUE;
2022            }
2023        }
2024    }
2025
2026    if( t )
2027        torrentUnlock( t );
2028
2029    return success;
2030}
2031
2032void
2033tr_peerMgrAddIncoming( tr_peerMgr * manager,
2034                       tr_address * addr,
2035                       tr_port      port,
2036                       int          socket,
2037                       struct UTPSocket * utp_socket )
2038{
2039    tr_session * session;
2040
2041    managerLock( manager );
2042
2043    assert( tr_isSession( manager->session ) );
2044    session = manager->session;
2045
2046    if( tr_sessionIsAddressBlocked( session, addr ) )
2047    {
2048        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
2049        if(socket >= 0)
2050            tr_netClose( session, socket );
2051#ifdef WITH_UTP
2052        else
2053            UTP_Close( utp_socket );
2054#endif
2055    }
2056    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2057    {
2058        if(socket >= 0)
2059            tr_netClose( session, socket );
2060#ifdef WITH_UTP
2061        else
2062            UTP_Close( utp_socket );
2063#endif
2064    }
2065    else /* we don't have a connection to them yet... */
2066    {
2067        tr_peerIo *    io;
2068        tr_handshake * handshake;
2069
2070        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket, utp_socket );
2071
2072        handshake = tr_handshakeNew( io,
2073                                     session->encryptionMode,
2074                                     myHandshakeDoneCB,
2075                                     manager );
2076
2077        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2078
2079        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2080                                 handshakeCompare );
2081    }
2082
2083    managerUnlock( manager );
2084}
2085
2086void
2087tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2088                  const tr_pex * pex, int8_t seedProbability )
2089{
2090    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2091    {
2092        Torrent * t = tor->torrentPeers;
2093        managerLock( t->manager );
2094
2095        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2096            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
2097                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2098
2099        managerUnlock( t->manager );
2100    }
2101}
2102
2103void
2104tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2105{
2106    Torrent * t = tor->torrentPeers;
2107    const int n = tr_ptrArraySize( &t->pool );
2108    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2109    struct peer_atom ** end = it + n;
2110
2111    while( it != end )
2112        atomSetSeed( *it++ );
2113}
2114
2115tr_pex *
2116tr_peerMgrCompactToPex( const void *    compact,
2117                        size_t          compactLen,
2118                        const uint8_t * added_f,
2119                        size_t          added_f_len,
2120                        size_t *        pexCount )
2121{
2122    size_t          i;
2123    size_t          n = compactLen / 6;
2124    const uint8_t * walk = compact;
2125    tr_pex *        pex = tr_new0( tr_pex, n );
2126
2127    for( i = 0; i < n; ++i )
2128    {
2129        pex[i].addr.type = TR_AF_INET;
2130        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2131        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2132        if( added_f && ( n == added_f_len ) )
2133            pex[i].flags = added_f[i];
2134    }
2135
2136    *pexCount = n;
2137    return pex;
2138}
2139
2140tr_pex *
2141tr_peerMgrCompact6ToPex( const void    * compact,
2142                         size_t          compactLen,
2143                         const uint8_t * added_f,
2144                         size_t          added_f_len,
2145                         size_t        * pexCount )
2146{
2147    size_t          i;
2148    size_t          n = compactLen / 18;
2149    const uint8_t * walk = compact;
2150    tr_pex *        pex = tr_new0( tr_pex, n );
2151
2152    for( i = 0; i < n; ++i )
2153    {
2154        pex[i].addr.type = TR_AF_INET6;
2155        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2156        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2157        if( added_f && ( n == added_f_len ) )
2158            pex[i].flags = added_f[i];
2159    }
2160
2161    *pexCount = n;
2162    return pex;
2163}
2164
2165tr_pex *
2166tr_peerMgrArrayToPex( const void * array,
2167                      size_t       arrayLen,
2168                      size_t      * pexCount )
2169{
2170    size_t          i;
2171    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2172    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2173    const uint8_t * walk = array;
2174    tr_pex        * pex = tr_new0( tr_pex, n );
2175
2176    for( i = 0 ; i < n ; i++ ) {
2177        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2178        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2179        pex[i].flags = 0x00;
2180        walk += sizeof( tr_address ) + 2;
2181    }
2182
2183    *pexCount = n;
2184    return pex;
2185}
2186
2187/**
2188***
2189**/
2190
2191void
2192tr_peerMgrSetBlame( tr_torrent     * tor,
2193                    tr_piece_index_t pieceIndex,
2194                    int              success )
2195{
2196    if( !success )
2197    {
2198        int        peerCount, i;
2199        Torrent *  t = tor->torrentPeers;
2200        tr_peer ** peers;
2201
2202        assert( torrentIsLocked( t ) );
2203
2204        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2205        for( i = 0; i < peerCount; ++i )
2206        {
2207            tr_peer * peer = peers[i];
2208            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
2209            {
2210                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2211                        tr_atomAddrStr( peer->atom ),
2212                        pieceIndex, (int)peer->strikes + 1 );
2213                addStrike( t, peer );
2214            }
2215        }
2216    }
2217}
2218
2219int
2220tr_pexCompare( const void * va, const void * vb )
2221{
2222    const tr_pex * a = va;
2223    const tr_pex * b = vb;
2224    int i;
2225
2226    assert( tr_isPex( a ) );
2227    assert( tr_isPex( b ) );
2228
2229    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
2230        return i;
2231
2232    if( a->port != b->port )
2233        return a->port < b->port ? -1 : 1;
2234
2235    return 0;
2236}
2237
2238#if 0
2239static int
2240peerPrefersCrypto( const tr_peer * peer )
2241{
2242    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2243        return TRUE;
2244
2245    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2246        return FALSE;
2247
2248    return tr_peerIoIsEncrypted( peer->io );
2249}
2250#endif
2251
2252/* better goes first */
2253static int
2254compareAtomsByUsefulness( const void * va, const void *vb )
2255{
2256    const struct peer_atom * a = * (const struct peer_atom**) va;
2257    const struct peer_atom * b = * (const struct peer_atom**) vb;
2258
2259    assert( tr_isAtom( a ) );
2260    assert( tr_isAtom( b ) );
2261
2262    if( a->piece_data_time != b->piece_data_time )
2263        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2264    if( a->fromBest != b->fromBest )
2265        return a->fromBest < b->fromBest ? -1 : 1;
2266    if( a->numFails != b->numFails )
2267        return a->numFails < b->numFails ? -1 : 1;
2268
2269    return 0;
2270}
2271
2272int
2273tr_peerMgrGetPeers( tr_torrent   * tor,
2274                    tr_pex      ** setme_pex,
2275                    uint8_t        af,
2276                    uint8_t        list_mode,
2277                    int            maxCount )
2278{
2279    int i;
2280    int n;
2281    int count = 0;
2282    int atomCount = 0;
2283    const Torrent * t = tor->torrentPeers;
2284    struct peer_atom ** atoms = NULL;
2285    tr_pex * pex;
2286    tr_pex * walk;
2287
2288    assert( tr_isTorrent( tor ) );
2289    assert( setme_pex != NULL );
2290    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2291    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
2292
2293    managerLock( t->manager );
2294
2295    /**
2296    ***  build a list of atoms
2297    **/
2298
2299    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2300    {
2301        int i;
2302        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2303        atomCount = tr_ptrArraySize( &t->peers );
2304        atoms = tr_new( struct peer_atom *, atomCount );
2305        for( i=0; i<atomCount; ++i )
2306            atoms[i] = peers[i]->atom;
2307    }
2308    else /* TR_PEERS_ALL */
2309    {
2310        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
2311        atomCount = tr_ptrArraySize( &t->pool );
2312        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
2313    }
2314
2315    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2316
2317    /**
2318    ***  add the first N of them into our return list
2319    **/
2320
2321    n = MIN( atomCount, maxCount );
2322    pex = walk = tr_new0( tr_pex, n );
2323
2324    for( i=0; i<atomCount && count<n; ++i )
2325    {
2326        const struct peer_atom * atom = atoms[i];
2327        if( atom->addr.type == af )
2328        {
2329            assert( tr_isAddress( &atom->addr ) );
2330            walk->addr = atom->addr;
2331            walk->port = atom->port;
2332            walk->flags = atom->flags;
2333            ++count;
2334            ++walk;
2335        }
2336    }
2337
2338    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2339
2340    assert( ( walk - pex ) == count );
2341    *setme_pex = pex;
2342
2343    /* cleanup */
2344    tr_free( atoms );
2345    managerUnlock( t->manager );
2346    return count;
2347}
2348
2349static void atomPulse      ( int, short, void * );
2350static void bandwidthPulse ( int, short, void * );
2351static void rechokePulse   ( int, short, void * );
2352static void reconnectPulse ( int, short, void * );
2353
2354static struct event *
2355createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2356{
2357    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2358    tr_timerAddMsec( timer, msec );
2359    return timer;
2360}
2361
2362static void
2363ensureMgrTimersExist( struct tr_peerMgr * m )
2364{
2365    if( m->atomTimer == NULL )
2366        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2367
2368    if( m->bandwidthTimer == NULL )
2369        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2370
2371    if( m->rechokeTimer == NULL )
2372        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2373
2374    if( m->refillUpkeepTimer == NULL )
2375        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2376}
2377
2378void
2379tr_peerMgrStartTorrent( tr_torrent * tor )
2380{
2381    Torrent * t = tor->torrentPeers;
2382
2383    assert( tr_isTorrent( tor ) );
2384    assert( tr_torrentIsLocked( tor ) );
2385
2386    ensureMgrTimersExist( t->manager );
2387
2388    t->isRunning = TRUE;
2389    t->maxPeers = t->tor->maxConnectedPeers;
2390    t->pieceSortState = PIECES_UNSORTED;
2391
2392    rechokePulse( 0, 0, t->manager );
2393}
2394
2395static void
2396stopTorrent( Torrent * t )
2397{
2398    int i, n;
2399
2400    t->isRunning = FALSE;
2401
2402    replicationFree( t );
2403    invalidatePieceSorting( t );
2404
2405    /* disconnect the peers. */
2406    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
2407        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
2408    tr_ptrArrayClear( &t->peers );
2409
2410    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2411     * which removes the handshake from t->outgoingHandshakes... */
2412    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2413        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2414}
2415
2416void
2417tr_peerMgrStopTorrent( tr_torrent * tor )
2418{
2419    assert( tr_isTorrent( tor ) );
2420    assert( tr_torrentIsLocked( tor ) );
2421
2422    stopTorrent( tor->torrentPeers );
2423}
2424
2425void
2426tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2427{
2428    assert( tr_isTorrent( tor ) );
2429    assert( tr_torrentIsLocked( tor ) );
2430    assert( tor->torrentPeers == NULL );
2431
2432    tor->torrentPeers = torrentConstructor( manager, tor );
2433}
2434
2435void
2436tr_peerMgrRemoveTorrent( tr_torrent * tor )
2437{
2438    assert( tr_isTorrent( tor ) );
2439    assert( tr_torrentIsLocked( tor ) );
2440
2441    stopTorrent( tor->torrentPeers );
2442    torrentDestructor( tor->torrentPeers );
2443}
2444
2445void
2446tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2447{
2448    assert( tr_isTorrent( tor ) );
2449    assert( torrentIsLocked( tor->torrentPeers ) );
2450    assert( tab != NULL );
2451    assert( tabCount > 0 );
2452
2453    memset( tab, 0, tabCount );
2454
2455    if( tr_torrentHasMetadata( tor ) )
2456    {
2457        tr_piece_index_t i;
2458        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2459        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2460        const float interval = tor->info.pieceCount / (float)tabCount;
2461        const tr_bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2462
2463        for( i=0; i<tabCount; ++i )
2464        {
2465            const int piece = i * interval;
2466
2467            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2468                tab[i] = -1;
2469            else if( peerCount ) {
2470                int j;
2471                for( j=0; j<peerCount; ++j )
2472                    if( tr_bitsetHas( &peers[j]->have, piece ) )
2473                        ++tab[i];
2474            }
2475        }
2476    }
2477}
2478
2479/* Returns the pieces that are available from peers */
2480tr_bitfield*
2481tr_peerMgrGetAvailable( const tr_torrent * tor )
2482{
2483    int i;
2484    Torrent * t = tor->torrentPeers;
2485    const int peerCount = tr_ptrArraySize( &t->peers );
2486    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2487    tr_bitfield * pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2488
2489    assert( tr_torrentIsLocked( tor ) );
2490
2491    for( i=0; i<peerCount; ++i )
2492        tr_bitsetOr( pieces, &peers[i]->have );
2493
2494    return pieces;
2495}
2496
2497void
2498tr_peerMgrTorrentStats( tr_torrent  * tor,
2499                        int         * setmePeersKnown,
2500                        int         * setmePeersConnected,
2501                        int         * setmeSeedsConnected,
2502                        int         * setmeWebseedsSendingToUs,
2503                        int         * setmePeersSendingToUs,
2504                        int         * setmePeersGettingFromUs,
2505                        int         * setmePeersFrom )
2506{
2507    int i, size;
2508    const Torrent * t = tor->torrentPeers;
2509    const tr_peer ** peers;
2510
2511    assert( tr_torrentIsLocked( tor ) );
2512
2513    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2514    size = tr_ptrArraySize( &t->peers );
2515
2516    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2517    *setmePeersConnected       = 0;
2518    *setmeSeedsConnected       = 0;
2519    *setmePeersGettingFromUs   = 0;
2520    *setmePeersSendingToUs     = 0;
2521    *setmeWebseedsSendingToUs  = 0;
2522
2523    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2524        setmePeersFrom[i] = 0;
2525
2526    for( i=0; i<size; ++i )
2527    {
2528        const tr_peer * peer = peers[i];
2529        const struct peer_atom * atom = peer->atom;
2530
2531        if( peer->io == NULL ) /* not connected */
2532            continue;
2533
2534        ++*setmePeersConnected;
2535
2536        ++setmePeersFrom[atom->fromFirst];
2537
2538        if( clientIsDownloadingFrom( tor, peer ) )
2539            ++*setmePeersSendingToUs;
2540
2541        if( clientIsUploadingTo( peer ) )
2542            ++*setmePeersGettingFromUs;
2543
2544        if( atomIsSeed( atom ) )
2545            ++*setmeSeedsConnected;
2546    }
2547
2548    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2549}
2550
2551int
2552tr_peerMgrGetWebseedSpeed_Bps( const tr_torrent * tor, uint64_t now )
2553{
2554    int i;
2555    int tmp;
2556    int ret = 0;
2557
2558    const Torrent * t = tor->torrentPeers;
2559    const int n = tr_ptrArraySize( &t->webseeds );
2560    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2561
2562    for( i=0; i<n; ++i )
2563        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &tmp ) )
2564            ret += tmp;
2565
2566    return ret;
2567}
2568
2569
2570double*
2571tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2572{
2573    int i;
2574    const Torrent * t = tor->torrentPeers;
2575    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2576    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2577    const uint64_t now = tr_time_msec( );
2578    double * ret = tr_new0( double, webseedCount );
2579
2580    assert( tr_isTorrent( tor ) );
2581    assert( tr_torrentIsLocked( tor ) );
2582    assert( t->manager != NULL );
2583    assert( webseedCount == tor->info.webseedCount );
2584
2585    for( i=0; i<webseedCount; ++i ) {
2586        int Bps;
2587        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2588            ret[i] = Bps / (double)tr_speed_K;
2589        else
2590            ret[i] = -1.0;
2591    }
2592
2593    return ret;
2594}
2595
2596int
2597tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2598{
2599    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2600}
2601
2602
2603struct tr_peer_stat *
2604tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2605{
2606    int i;
2607    const Torrent * t = tor->torrentPeers;
2608    const int size = tr_ptrArraySize( &t->peers );
2609    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2610    const uint64_t now_msec = tr_time_msec( );
2611    const time_t now = tr_time();
2612    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2613
2614    assert( tr_isTorrent( tor ) );
2615    assert( tr_torrentIsLocked( tor ) );
2616    assert( t->manager );
2617
2618    for( i=0; i<size; ++i )
2619    {
2620        char *                   pch;
2621        const tr_peer *          peer = peers[i];
2622        const struct peer_atom * atom = peer->atom;
2623        tr_peer_stat *           stat = ret + i;
2624
2625        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2626        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2627                   sizeof( stat->client ) );
2628        stat->port                = ntohs( peer->atom->port );
2629        stat->from                = atom->fromFirst;
2630        stat->progress            = peer->progress;
2631        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2632        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2633        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2634        stat->peerIsChoked        = peer->peerIsChoked;
2635        stat->peerIsInterested    = peer->peerIsInterested;
2636        stat->clientIsChoked      = peer->clientIsChoked;
2637        stat->clientIsInterested  = peer->clientIsInterested;
2638        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2639        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2640        stat->isUploadingTo       = clientIsUploadingTo( peer );
2641        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2642
2643        stat->blocksToPeer        = tr_historyGet( peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2644        stat->blocksToClient      = tr_historyGet( peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2645        stat->cancelsToPeer       = tr_historyGet( peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2646        stat->cancelsToClient     = tr_historyGet( peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2647
2648        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2649        stat->pendingReqsToClient = peer->pendingReqsToClient;
2650
2651        pch = stat->flagStr;
2652        if( peer->io->utp_socket != NULL) *pch++ = 'T';
2653        if( t->optimistic == peer ) *pch++ = 'O';
2654        if( stat->isDownloadingFrom ) *pch++ = 'D';
2655        else if( stat->clientIsInterested ) *pch++ = 'd';
2656        if( stat->isUploadingTo ) *pch++ = 'U';
2657        else if( stat->peerIsInterested ) *pch++ = 'u';
2658        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2659        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2660        if( stat->isEncrypted ) *pch++ = 'E';
2661        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2662        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2663        if( stat->isIncoming ) *pch++ = 'I';
2664        *pch = '\0';
2665    }
2666
2667    *setmeCount = size;
2668
2669    return ret;
2670}
2671
2672/**
2673***
2674**/
2675
2676void
2677tr_peerMgrClearInterest( tr_torrent * tor )
2678{
2679    int i;
2680    Torrent * t = tor->torrentPeers;
2681    const int peerCount = tr_ptrArraySize( &t->peers );
2682
2683    assert( tr_isTorrent( tor ) );
2684    assert( tr_torrentIsLocked( tor ) );
2685
2686    for( i=0; i<peerCount; ++i )
2687    {
2688        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2689        tr_peerMsgsSetInterested( peer->msgs, FALSE );
2690    }
2691}
2692
2693/* do we still want this piece and does the peer have it? */
2694static tr_bool
2695isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2696{
2697    return ( !tor->info.pieces[index].dnd ) /* we want it */
2698        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2699        && ( tr_bitsetHas( &peer->have, index ) ); /* peer has it */
2700}
2701
2702/* does this peer have any pieces that we want? */
2703static tr_bool
2704isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2705{
2706    tr_piece_index_t i, n;
2707
2708    if ( tr_torrentIsSeed( tor ) )
2709        return FALSE;
2710
2711    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2712        return FALSE;
2713
2714    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2715        if( isPieceInteresting( tor, peer, i ) )
2716            return TRUE;
2717
2718    return FALSE;
2719}
2720
2721/* determines who we send "interested" messages to */
2722static void
2723rechokeDownloads( Torrent * t )
2724{
2725    int i;
2726    const time_t now = tr_time( );
2727    const int MIN_INTERESTING_PEERS = 5;
2728    const int peerCount = tr_ptrArraySize( &t->peers );
2729    int maxPeers;
2730
2731    int badCount         = 0;
2732    int goodCount        = 0;
2733    int untestedCount    = 0;
2734    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2735    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2736    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2737
2738    /* decide how many peers to be interested in */
2739    {
2740        int blocks = 0;
2741        int cancels = 0;
2742        time_t timeSinceCancel;
2743
2744        /* Count up how many blocks & cancels each peer has.
2745         *
2746         * There are two situations where we send out cancels --
2747         *
2748         * 1. We've got unresponsive peers, which is handled by deciding
2749         *    -which- peers to be interested in.
2750         *
2751         * 2. We've hit our bandwidth cap, which is handled by deciding
2752         *    -how many- peers to be interested in.
2753         *
2754         * We're working on 2. here, so we need to ignore unresponsive
2755         * peers in our calculations lest they confuse Transmission into
2756         * thinking it's hit its bandwidth cap.
2757         */
2758        for( i=0; i<peerCount; ++i )
2759        {
2760            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2761            const int b = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2762            const int c = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2763
2764            if( b == 0 ) /* ignore unresponsive peers, as described above */
2765                continue;
2766
2767            blocks += b;
2768            cancels += c;
2769        }
2770
2771        if( cancels > 0 )
2772        {
2773            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2774             * higher values indicate more congestion. */
2775            const double cancelRate = cancels / (double)(cancels + blocks);
2776            const double mult = 1 - MIN( cancelRate, 0.5 );
2777            maxPeers = t->interestedCount * mult;
2778            tordbg( t, "cancel rate is %.3f -- reducing the "
2779                       "number of peers we're interested in by %.0f percent",
2780                       cancelRate, mult * 100 );
2781            t->lastCancel = now;
2782        }
2783
2784        timeSinceCancel = now - t->lastCancel;
2785        if( timeSinceCancel )
2786        {
2787            const int maxIncrease = 15;
2788            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2789            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2790            const int inc = maxIncrease * mult;
2791            maxPeers = t->maxPeers + inc;
2792            tordbg( t, "time since last cancel is %li -- increasing the "
2793                       "number of peers we're interested in by %d",
2794                       timeSinceCancel, inc );
2795        }
2796    }
2797
2798    /* don't let the previous section's number tweaking go too far... */
2799    if( maxPeers < MIN_INTERESTING_PEERS )
2800        maxPeers = MIN_INTERESTING_PEERS;
2801    if( maxPeers > t->tor->maxConnectedPeers )
2802        maxPeers = t->tor->maxConnectedPeers;
2803
2804    t->maxPeers = maxPeers;
2805
2806    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2807     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2808     * That's the order in which we'll choose who to show interest in */
2809    {
2810        /* Randomize the peer array so the peers in the three groups will be unsorted... */
2811        int n = peerCount;
2812        tr_peer ** peers = tr_memdup( tr_ptrArrayBase( &t->peers ), n * sizeof( tr_peer * ) );
2813
2814        while( n > 0 )
2815        {
2816            const int i = tr_cryptoWeakRandInt( n );
2817            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2818
2819            if( !isPeerInteresting( t->tor, peer ) )
2820            {
2821                tr_peerMsgsSetInterested( peer->msgs, FALSE );
2822            }
2823            else
2824            {
2825                const int blocks = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2826                const int cancels = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2827
2828                if( !blocks && !cancels )
2829                    untested[untestedCount++] = peer;
2830                else if( !cancels )
2831                    good[goodCount++] = peer;
2832                else if( !blocks )
2833                    bad[badCount++] = peer;
2834                else if( ( cancels * 10 ) < blocks )
2835                    good[goodCount++] = peer;
2836                else
2837                    bad[badCount++] = peer;
2838            }
2839
2840            tr_removeElementFromArray( peers, i, sizeof(tr_peer*), n-- );
2841        }
2842
2843        tr_free( peers );
2844    }
2845
2846    t->interestedCount = 0;
2847
2848    /* We've decided (1) how many peers to be interested in,
2849     * and (2) which peers are the best candidates,
2850     * Now it's time to update our `interest' flags. */
2851    for( i=0; i<goodCount; ++i ) {
2852        const tr_bool b = t->interestedCount < maxPeers;
2853        tr_peerMsgsSetInterested( good[i]->msgs, b );
2854        if( b )
2855            ++t->interestedCount;
2856    }
2857    for( i=0; i<untestedCount; ++i ) {
2858        const tr_bool b = t->interestedCount < maxPeers;
2859        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2860        if( b )
2861            ++t->interestedCount;
2862    }
2863    for( i=0; i<badCount; ++i ) {
2864        const tr_bool b = t->interestedCount < maxPeers;
2865        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2866        if( b )
2867            ++t->interestedCount;
2868    }
2869
2870/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2871
2872    /* cleanup */
2873    tr_free( untested );
2874    tr_free( good );
2875    tr_free( bad );
2876}
2877
2878/**
2879***
2880**/
2881
2882struct ChokeData
2883{
2884    tr_bool         isInterested;
2885    tr_bool         wasChoked;
2886    tr_bool         isChoked;
2887    int             rate;
2888    int             salt;
2889    tr_peer *       peer;
2890};
2891
2892static int
2893compareChoke( const void * va,
2894              const void * vb )
2895{
2896    const struct ChokeData * a = va;
2897    const struct ChokeData * b = vb;
2898
2899    if( a->rate != b->rate ) /* prefer higher overall speeds */
2900        return a->rate > b->rate ? -1 : 1;
2901
2902    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2903        return a->wasChoked ? 1 : -1;
2904
2905    if( a->salt != b->salt ) /* random order */
2906        return a->salt - b->salt;
2907
2908    return 0;
2909}
2910
2911/* is this a new connection? */
2912static int
2913isNew( const tr_peer * peer )
2914{
2915    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2916}
2917
2918/* get a rate for deciding which peers to choke and unchoke. */
2919static int
2920getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
2921{
2922    int Bps;
2923
2924    if( tr_torrentIsSeed( tor ) )
2925        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2926
2927    /* downloading a private torrent... take upload speed into account
2928     * because there may only be a small window of opportunity to share */
2929    else if( tr_torrentIsPrivate( tor ) )
2930        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
2931            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2932
2933    /* downloading a public torrent */
2934    else
2935        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
2936
2937    /* convert it to bytes per second */
2938    return Bps;
2939}
2940
2941static inline tr_bool
2942isBandwidthMaxedOut( const tr_bandwidth * b,
2943                     const uint64_t now_msec, tr_direction dir )
2944{
2945    if( !tr_bandwidthIsLimited( b, dir ) )
2946        return FALSE;
2947    else {
2948        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
2949        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
2950        return got >= want;
2951    }
2952}
2953
2954static void
2955rechokeUploads( Torrent * t, const uint64_t now )
2956{
2957    int i, size, unchokedInterested;
2958    const int peerCount = tr_ptrArraySize( &t->peers );
2959    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2960    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2961    const tr_session * session = t->manager->session;
2962    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2963    const tr_bool isMaxedOut = isBandwidthMaxedOut( t->tor->bandwidth, now, TR_UP );
2964
2965    assert( torrentIsLocked( t ) );
2966
2967    /* an optimistic unchoke peer's "optimistic"
2968     * state lasts for N calls to rechokeUploads(). */
2969    if( t->optimisticUnchokeTimeScaler > 0 )
2970        t->optimisticUnchokeTimeScaler--;
2971    else
2972        t->optimistic = NULL;
2973
2974    /* sort the peers by preference and rate */
2975    for( i = 0, size = 0; i < peerCount; ++i )
2976    {
2977        tr_peer * peer = peers[i];
2978        struct peer_atom * atom = peer->atom;
2979
2980        if( peer->progress >= 1.0 ) /* choke all seeds */
2981        {
2982            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2983        }
2984        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2985        {
2986            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2987        }
2988        else if( chokeAll ) /* choke everyone if we're not uploading */
2989        {
2990            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2991        }
2992        else if( peer != t->optimistic )
2993        {
2994            struct ChokeData * n = &choke[size++];
2995            n->peer         = peer;
2996            n->isInterested = peer->peerIsInterested;
2997            n->wasChoked    = peer->peerIsChoked;
2998            n->rate         = getRate( t->tor, atom, now );
2999            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
3000            n->isChoked     = TRUE;
3001        }
3002    }
3003
3004    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3005
3006    /**
3007     * Reciprocation and number of uploads capping is managed by unchoking
3008     * the N peers which have the best upload rate and are interested.
3009     * This maximizes the client's download rate. These N peers are
3010     * referred to as downloaders, because they are interested in downloading
3011     * from the client.
3012     *
3013     * Peers which have a better upload rate (as compared to the downloaders)
3014     * but aren't interested get unchoked. If they become interested, the
3015     * downloader with the worst upload rate gets choked. If a client has
3016     * a complete file, it uses its upload rate rather than its download
3017     * rate to decide which peers to unchoke.
3018     *
3019     * If our bandwidth is maxed out, don't unchoke any more peers.
3020     */
3021    unchokedInterested = 0;
3022    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3023        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : FALSE;
3024        if( choke[i].isInterested )
3025            ++unchokedInterested;
3026    }
3027
3028    /* optimistic unchoke */
3029    if( !t->optimistic && !isMaxedOut && (i<size) )
3030    {
3031        int n;
3032        struct ChokeData * c;
3033        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3034
3035        for( ; i<size; ++i )
3036        {
3037            if( choke[i].isInterested )
3038            {
3039                const tr_peer * peer = choke[i].peer;
3040                int x = 1, y;
3041                if( isNew( peer ) ) x *= 3;
3042                for( y=0; y<x; ++y )
3043                    tr_ptrArrayAppend( &randPool, &choke[i] );
3044            }
3045        }
3046
3047        if(( n = tr_ptrArraySize( &randPool )))
3048        {
3049            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3050            c->isChoked = FALSE;
3051            t->optimistic = c->peer;
3052            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3053        }
3054
3055        tr_ptrArrayDestruct( &randPool, NULL );
3056    }
3057
3058    for( i=0; i<size; ++i )
3059        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3060
3061    /* cleanup */
3062    tr_free( choke );
3063}
3064
3065static void
3066rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3067{
3068    tr_torrent * tor = NULL;
3069    tr_peerMgr * mgr = vmgr;
3070    const uint64_t now = tr_sessionGetTimeMsec( mgr->session );
3071
3072    managerLock( mgr );
3073
3074    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3075        if( tor->isRunning ) {
3076            rechokeUploads( tor->torrentPeers, now );
3077            if( !tr_torrentIsSeed( tor ) )
3078                rechokeDownloads( tor->torrentPeers );
3079        }
3080    }
3081
3082    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3083    managerUnlock( mgr );
3084}
3085
3086/***
3087****
3088****  Life and Death
3089****
3090***/
3091
3092typedef enum
3093{
3094    TR_CAN_KEEP,
3095    TR_CAN_CLOSE,
3096    TR_MUST_CLOSE,
3097}
3098tr_close_type_t;
3099
3100static tr_close_type_t
3101shouldPeerBeClosed( const Torrent    * t,
3102                    const tr_peer    * peer,
3103                    int                peerCount,
3104                    const time_t       now )
3105{
3106    const tr_torrent *       tor = t->tor;
3107    const struct peer_atom * atom = peer->atom;
3108
3109    /* if it's marked for purging, close it */
3110    if( peer->doPurge )
3111    {
3112        tordbg( t, "purging peer %s because its doPurge flag is set",
3113                tr_atomAddrStr( atom ) );
3114        return TR_MUST_CLOSE;
3115    }
3116
3117    /* if we're seeding and the peer has everything we have,
3118     * and enough time has passed for a pex exchange, then disconnect */
3119    if( tr_torrentIsSeed( tor ) )
3120    {
3121        tr_bool peerHasEverything;
3122
3123        if( atom->seedProbability != -1 )
3124        {
3125            peerHasEverything = atomIsSeed( atom );
3126        }
3127        else
3128        {
3129            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
3130            tr_bitsetDifference( tmp, &peer->have );
3131            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
3132            tr_bitfieldFree( tmp );
3133        }
3134
3135        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
3136        {
3137            tordbg( t, "purging peer %s because we're both seeds",
3138                    tr_atomAddrStr( atom ) );
3139            return TR_MUST_CLOSE;
3140        }
3141    }
3142
3143    /* disconnect if it's been too long since piece data has been transferred.
3144     * this is on a sliding scale based on number of available peers... */
3145    {
3146        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3147        /* if we have >= relaxIfFewerThan, strictness is 100%.
3148         * if we have zero connections, strictness is 0% */
3149        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3150                               ? 1.0
3151                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3152        const int lo = MIN_UPLOAD_IDLE_SECS;
3153        const int hi = MAX_UPLOAD_IDLE_SECS;
3154        const int limit = hi - ( ( hi - lo ) * strictness );
3155        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3156/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3157        if( idleTime > limit ) {
3158            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3159                       tr_atomAddrStr( atom ), idleTime );
3160            return TR_CAN_CLOSE;
3161        }
3162    }
3163
3164    return TR_CAN_KEEP;
3165}
3166
3167static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
3168
3169static tr_peer **
3170getPeersToClose( Torrent * t, tr_close_type_t closeType,
3171                 const uint64_t now_msec, const time_t now_sec,
3172                 int * setmeSize )
3173{
3174    int i, peerCount, outsize;
3175    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3176    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
3177
3178    assert( torrentIsLocked( t ) );
3179
3180    for( i = outsize = 0; i < peerCount; ++i )
3181        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) == closeType )
3182            ret[outsize++] = peers[i];
3183
3184    sortPeersByLivelinessReverse ( ret, NULL, outsize, now_msec );
3185
3186    *setmeSize = outsize;
3187    return ret;
3188}
3189
3190static int
3191getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3192{
3193    int sec;
3194
3195    /* if we were recently connected to this peer and transferring piece
3196     * data, try to reconnect to them sooner rather that later -- we don't
3197     * want network troubles to get in the way of a good peer. */
3198    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3199        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3200
3201    /* don't allow reconnects more often than our minimum */
3202    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3203        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3204
3205    /* otherwise, the interval depends on how many times we've tried
3206     * and failed to connect to the peer */
3207    else switch( atom->numFails ) {
3208        case 0: sec = 0; break;
3209        case 1: sec = 5; break;
3210        case 2: sec = 2 * 60; break;
3211        case 3: sec = 15 * 60; break;
3212        case 4: sec = 30 * 60; break;
3213        case 5: sec = 60 * 60; break;
3214        default: sec = 120 * 60; break;
3215    }
3216
3217    /* penalize peers that were unreachable the last time we tried */
3218    if( atom->flags2 & MYFLAG_UNREACHABLE )
3219        sec += sec;
3220
3221    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3222    return sec;
3223}
3224
3225static void
3226removePeer( Torrent * t, tr_peer * peer )
3227{
3228    tr_peer * removed;
3229    struct peer_atom * atom = peer->atom;
3230
3231    assert( torrentIsLocked( t ) );
3232    assert( atom );
3233
3234    atom->time = tr_time( );
3235
3236    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3237
3238    if( replicationExists( t ) )
3239        tr_decrReplicationFromBitset( t, &peer->have );
3240
3241    assert( removed == peer );
3242    peerDestructor( t, removed );
3243}
3244
3245static void
3246closePeer( Torrent * t, tr_peer * peer )
3247{
3248    struct peer_atom * atom;
3249
3250    assert( t != NULL );
3251    assert( peer != NULL );
3252
3253    atom = peer->atom;
3254
3255    /* if we transferred piece data, then they might be good peers,
3256       so reset their `numFails' weight to zero. otherwise we connected
3257       to them fruitlessly, so mark it as another fail */
3258    if( atom->piece_data_time ) {
3259        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3260        atom->numFails = 0;
3261    } else {
3262        ++atom->numFails;
3263        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3264    }
3265
3266    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3267    removePeer( t, peer );
3268}
3269
3270static void
3271removeAllPeers( Torrent * t )
3272{
3273    while( !tr_ptrArrayEmpty( &t->peers ) )
3274        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3275}
3276
3277static void
3278closeBadPeers( Torrent * t, const uint64_t now_msec, const time_t now_sec )
3279{
3280    if( !t->isRunning )
3281    {
3282        removeAllPeers( t );
3283    }
3284    else
3285    {
3286        int i;
3287        int mustCloseCount;
3288        struct tr_peer ** mustClose;
3289
3290        /* disconnect the really bad peers */
3291        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now_msec, now_sec, &mustCloseCount );
3292        for( i=0; i<mustCloseCount; ++i )
3293            closePeer( t, mustClose[i] );
3294        tr_free( mustClose );
3295    }
3296}
3297
3298struct peer_liveliness
3299{
3300    tr_peer * peer;
3301    void * clientData;
3302    time_t pieceDataTime;
3303    time_t time;
3304    int speed;
3305    tr_bool doPurge;
3306};
3307
3308static int
3309comparePeerLiveliness( const void * va, const void * vb )
3310{
3311    const struct peer_liveliness * a = va;
3312    const struct peer_liveliness * b = vb;
3313
3314    if( a->doPurge != b->doPurge )
3315        return a->doPurge ? 1 : -1;
3316
3317    if( a->speed != b->speed ) /* faster goes first */
3318        return a->speed > b->speed ? -1 : 1;
3319
3320    /* the one to give us data more recently goes first */
3321    if( a->pieceDataTime != b->pieceDataTime )
3322        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3323
3324    /* the one we connected to most recently goes first */
3325    if( a->time != b->time )
3326        return a->time > b->time ? -1 : 1;
3327
3328    return 0;
3329}
3330
3331static int
3332comparePeerLivelinessReverse( const void * va, const void * vb )
3333{
3334    return -comparePeerLiveliness (va, vb);
3335}
3336
3337static void
3338sortPeersByLivelinessImpl( tr_peer  ** peers,
3339                           void     ** clientData,
3340                           int         n,
3341                           uint64_t    now,
3342                           int (*compare) ( const void *va, const void *vb ) )
3343{
3344    int i;
3345    struct peer_liveliness *lives, *l;
3346
3347    /* build a sortable array of peer + extra info */
3348    lives = l = tr_new0( struct peer_liveliness, n );
3349    for( i=0; i<n; ++i, ++l )
3350    {
3351        tr_peer * p = peers[i];
3352        l->peer = p;
3353        l->doPurge = p->doPurge;
3354        l->pieceDataTime = p->atom->piece_data_time;
3355        l->time = p->atom->time;
3356        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3357                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3358        if( clientData )
3359            l->clientData = clientData[i];
3360    }
3361
3362    /* sort 'em */
3363    assert( n == ( l - lives ) );
3364    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3365
3366    /* build the peer array */
3367    for( i=0, l=lives; i<n; ++i, ++l ) {
3368        peers[i] = l->peer;
3369        if( clientData )
3370            clientData[i] = l->clientData;
3371    }
3372    assert( n == ( l - lives ) );
3373
3374    /* cleanup */
3375    tr_free( lives );
3376}
3377
3378static void
3379sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3380{
3381    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3382}
3383
3384static void
3385sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3386{
3387    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
3388}
3389
3390
3391static void
3392enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3393{
3394    int n = tr_ptrArraySize( &t->peers );
3395    const int max = tr_torrentGetPeerLimit( t->tor );
3396    if( n > max )
3397    {
3398        void * base = tr_ptrArrayBase( &t->peers );
3399        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3400        sortPeersByLiveliness( peers, NULL, n, now );
3401        while( n > max )
3402            closePeer( t, peers[--n] );
3403        tr_free( peers );
3404    }
3405}
3406
3407static void
3408enforceSessionPeerLimit( tr_session * session, uint64_t now )
3409{
3410    int n = 0;
3411    tr_torrent * tor = NULL;
3412    const int max = tr_sessionGetPeerLimit( session );
3413
3414    /* count the total number of peers */
3415    while(( tor = tr_torrentNext( session, tor )))
3416        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3417
3418    /* if there are too many, prune out the worst */
3419    if( n > max )
3420    {
3421        tr_peer ** peers = tr_new( tr_peer*, n );
3422        Torrent ** torrents = tr_new( Torrent*, n );
3423
3424        /* populate the peer array */
3425        n = 0;
3426        tor = NULL;
3427        while(( tor = tr_torrentNext( session, tor ))) {
3428            int i;
3429            Torrent * t = tor->torrentPeers;
3430            const int tn = tr_ptrArraySize( &t->peers );
3431            for( i=0; i<tn; ++i, ++n ) {
3432                peers[n] = tr_ptrArrayNth( &t->peers, i );
3433                torrents[n] = t;
3434            }
3435        }
3436
3437        /* sort 'em */
3438        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3439
3440        /* cull out the crappiest */
3441        while( n-- > max )
3442            closePeer( torrents[n], peers[n] );
3443
3444        /* cleanup */
3445        tr_free( torrents );
3446        tr_free( peers );
3447    }
3448}
3449
3450static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3451
3452static void
3453reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3454{
3455    tr_torrent * tor;
3456    tr_peerMgr * mgr = vmgr;
3457    const time_t now_sec = tr_time( );
3458    const uint64_t now_msec = tr_sessionGetTimeMsec( mgr->session );
3459
3460    /**
3461    ***  enforce the per-session and per-torrent peer limits
3462    **/
3463
3464    /* if we're over the per-torrent peer limits, cull some peers */
3465    tor = NULL;
3466    while(( tor = tr_torrentNext( mgr->session, tor )))
3467        if( tor->isRunning )
3468            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3469
3470    /* if we're over the per-session peer limits, cull some peers */
3471    enforceSessionPeerLimit( mgr->session, now_msec );
3472
3473    /* remove crappy peers */
3474    tor = NULL;
3475    while(( tor = tr_torrentNext( mgr->session, tor )))
3476        closeBadPeers( tor->torrentPeers, now_msec, now_sec );
3477
3478    /* try to make new peer connections */
3479    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3480}
3481
3482/****
3483*****
3484*****  BANDWIDTH ALLOCATION
3485*****
3486****/
3487
3488static void
3489pumpAllPeers( tr_peerMgr * mgr )
3490{
3491    tr_torrent * tor = NULL;
3492
3493    while(( tor = tr_torrentNext( mgr->session, tor )))
3494    {
3495        int j;
3496        Torrent * t = tor->torrentPeers;
3497
3498        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3499        {
3500            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3501            tr_peerMsgsPulse( peer->msgs );
3502        }
3503    }
3504}
3505
3506static void
3507bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3508{
3509    tr_torrent * tor;
3510    tr_peerMgr * mgr = vmgr;
3511    managerLock( mgr );
3512
3513    /* FIXME: this next line probably isn't necessary... */
3514    pumpAllPeers( mgr );
3515
3516    /* allocate bandwidth to the peers */
3517    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3518    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3519
3520    /* possibly stop torrents that have seeded enough */
3521    tor = NULL;
3522    while(( tor = tr_torrentNext( mgr->session, tor )))
3523        tr_torrentCheckSeedLimit( tor );
3524
3525    /* run the completeness check for any torrents that need it */
3526    tor = NULL;
3527    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3528        if( tor->torrentPeers->needsCompletenessCheck ) {
3529            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3530            tr_torrentRecheckCompleteness( tor );
3531        }
3532    }
3533
3534    /* stop torrents that are ready to stop, but couldn't be stopped earlier
3535     * during the peer-io callback call chain */
3536    tor = NULL;
3537    while(( tor = tr_torrentNext( mgr->session, tor )))
3538        if( tor->isStopping )
3539            tr_torrentStop( tor );
3540
3541    reconnectPulse( 0, 0, mgr );
3542
3543    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3544    managerUnlock( mgr );
3545}
3546
3547/***
3548****
3549***/
3550
3551static int
3552compareAtomPtrsByAddress( const void * va, const void *vb )
3553{
3554    const struct peer_atom * a = * (const struct peer_atom**) va;
3555    const struct peer_atom * b = * (const struct peer_atom**) vb;
3556
3557    assert( tr_isAtom( a ) );
3558    assert( tr_isAtom( b ) );
3559
3560    return tr_compareAddresses( &a->addr, &b->addr );
3561}
3562
3563/* best come first, worst go last */
3564static int
3565compareAtomPtrsByShelfDate( const void * va, const void *vb )
3566{
3567    time_t atime;
3568    time_t btime;
3569    const struct peer_atom * a = * (const struct peer_atom**) va;
3570    const struct peer_atom * b = * (const struct peer_atom**) vb;
3571    const int data_time_cutoff_secs = 60 * 60;
3572    const time_t tr_now = tr_time( );
3573
3574    assert( tr_isAtom( a ) );
3575    assert( tr_isAtom( b ) );
3576
3577    /* primary key: the last piece data time *if* it was within the last hour */
3578    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3579    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3580    if( atime != btime )
3581        return atime > btime ? -1 : 1;
3582
3583    /* secondary key: shelf date. */
3584    if( a->shelf_date != b->shelf_date )
3585        return a->shelf_date > b->shelf_date ? -1 : 1;
3586
3587    return 0;
3588}
3589
3590static int
3591getMaxAtomCount( const tr_torrent * tor )
3592{
3593    const int n = tor->maxConnectedPeers;
3594    /* approximate fit of the old jump discontinuous function */
3595    if( n >= 55 ) return     n + 150;
3596    if( n >= 20 ) return 2 * n + 95;
3597    return               4 * n + 55;
3598}
3599
3600static void
3601atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3602{
3603    tr_torrent * tor = NULL;
3604    tr_peerMgr * mgr = vmgr;
3605    managerLock( mgr );
3606
3607    while(( tor = tr_torrentNext( mgr->session, tor )))
3608    {
3609        int atomCount;
3610        Torrent * t = tor->torrentPeers;
3611        const int maxAtomCount = getMaxAtomCount( tor );
3612        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3613
3614        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3615        {
3616            int i;
3617            int keepCount = 0;
3618            int testCount = 0;
3619            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3620            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3621
3622            /* keep the ones that are in use */
3623            for( i=0; i<atomCount; ++i ) {
3624                struct peer_atom * atom = atoms[i];
3625                if( peerIsInUse( t, atom ) )
3626                    keep[keepCount++] = atom;
3627                else
3628                    test[testCount++] = atom;
3629            }
3630
3631            /* if there's room, keep the best of what's left */
3632            i = 0;
3633            if( keepCount < maxAtomCount ) {
3634                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3635                while( i<testCount && keepCount<maxAtomCount )
3636                    keep[keepCount++] = test[i++];
3637            }
3638
3639            /* free the culled atoms */
3640            while( i<testCount )
3641                tr_free( test[i++] );
3642
3643            /* rebuild Torrent.pool with what's left */
3644            tr_ptrArrayDestruct( &t->pool, NULL );
3645            t->pool = TR_PTR_ARRAY_INIT;
3646            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3647            for( i=0; i<keepCount; ++i )
3648                tr_ptrArrayAppend( &t->pool, keep[i] );
3649
3650            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3651
3652            /* cleanup */
3653            tr_free( test );
3654            tr_free( keep );
3655        }
3656    }
3657
3658    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3659    managerUnlock( mgr );
3660}
3661
3662/***
3663****
3664****
3665****
3666***/
3667
3668/* is this atom someone that we'd want to initiate a connection to? */
3669static tr_bool
3670isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3671{
3672    /* not if we're both seeds */
3673    if( tr_torrentIsSeed( tor ) )
3674        if( atomIsSeed( atom ) || ( atom->uploadOnly == UPLOAD_ONLY_YES ) )
3675            return FALSE;
3676
3677    /* not if we've already got a connection to them... */
3678    if( peerIsInUse( tor->torrentPeers, atom ) )
3679        return FALSE;
3680
3681    /* not if we just tried them already */
3682    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3683        return FALSE;
3684
3685    /* not if they're blocklisted */
3686    if( isAtomBlocklisted( tor->session, atom ) )
3687        return FALSE;
3688
3689    /* not if they're banned... */
3690    if( atom->flags2 & MYFLAG_BANNED )
3691        return FALSE;
3692
3693    return TRUE;
3694}
3695
3696struct peer_candidate
3697{
3698    uint64_t score;
3699    tr_torrent * tor;
3700    struct peer_atom * atom;
3701};
3702
3703static tr_bool
3704torrentWasRecentlyStarted( const tr_torrent * tor )
3705{
3706    return difftime( tr_time( ), tor->startDate ) < 120;
3707}
3708
3709static inline uint64_t
3710addValToKey( uint64_t value, int width, uint64_t addme )
3711{
3712    value = (value << (uint64_t)width);
3713    value |= addme;
3714    return value;
3715}
3716
3717/* smaller value is better */
3718static uint64_t
3719getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3720{
3721    uint64_t i;
3722    uint64_t score = 0;
3723    const tr_bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3724
3725    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3726    i = failed ? 1 : 0;
3727    score = addValToKey( score, 1, i );
3728
3729    /* prefer the one we attempted least recently (to cycle through all peers) */
3730    i = atom->lastConnectionAttemptAt;
3731    score = addValToKey( score, 32, i );
3732
3733    /* prefer peers belonging to a torrent of a higher priority */
3734    switch( tr_torrentGetPriority( tor ) ) {
3735        case TR_PRI_HIGH:    i = 0; break;
3736        case TR_PRI_NORMAL:  i = 1; break;
3737        case TR_PRI_LOW:     i = 2; break;
3738    }
3739    score = addValToKey( score, 4, i );
3740
3741    /* prefer recently-started torrents */
3742    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3743    score = addValToKey( score, 1, i );
3744
3745    /* prefer torrents we're downloading with */
3746    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3747    score = addValToKey( score, 1, i );
3748
3749    /* prefer peers that are known to be connectible */
3750    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3751    score = addValToKey( score, 1, i );
3752
3753    /* prefer peers that we might have a chance of uploading to...
3754       so lower seed probability is better */
3755    if( atom->seedProbability == 100 ) i = 101;
3756    else if( atom->seedProbability == -1 ) i = 100;
3757    else i = atom->seedProbability;
3758    score = addValToKey( score, 8, i );
3759
3760    /* Prefer peers that we got from more trusted sources.
3761     * lower `fromBest' values indicate more trusted sources */
3762    score = addValToKey( score, 4, atom->fromBest );
3763
3764    /* salt */
3765    score = addValToKey( score, 8, salt );
3766
3767    return score;
3768}
3769
3770/* sort an array of peer candidates */
3771static int
3772comparePeerCandidates( const void * va, const void * vb )
3773{
3774    const struct peer_candidate * a = va;
3775    const struct peer_candidate * b = vb;
3776
3777    if( a->score < b->score ) return -1;
3778    if( a->score > b->score ) return 1;
3779
3780    return 0;
3781}
3782
3783/** @return an array of all the atoms we might want to connect to */
3784static struct peer_candidate*
3785getPeerCandidates( tr_session * session, int * candidateCount )
3786{
3787    int n;
3788    tr_torrent * tor;
3789    struct peer_candidate * candidates;
3790    struct peer_candidate * walk;
3791    const time_t now = tr_time( );
3792    const uint64_t now_msec = tr_time_msec( );
3793    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3794    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3795
3796    /* don't start any new handshakes if we're full up */
3797    n = 0;
3798    tor= NULL;
3799    while(( tor = tr_torrentNext( session, tor )))
3800        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3801    if( maxCandidates <= n ) {
3802        *candidateCount = 0;
3803        return NULL;
3804    }
3805
3806    /* allocate an array of candidates */
3807    n = 0;
3808    tor= NULL;
3809    while(( tor = tr_torrentNext( session, tor )))
3810        n += tr_ptrArraySize( &tor->torrentPeers->pool );
3811    walk = candidates = tr_new( struct peer_candidate, n );
3812
3813    /* populate the candidate array */
3814    tor = NULL;
3815    while(( tor = tr_torrentNext( session, tor )))
3816    {
3817        int i, nAtoms;
3818        struct peer_atom ** atoms;
3819
3820        if( !tor->torrentPeers->isRunning )
3821            continue;
3822
3823        /* if we've already got enough peers in this torrent... */
3824        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3825            continue;
3826
3827        /* if we've already got enough speed in this torrent... */
3828        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( tor->bandwidth, now_msec, TR_UP ) )
3829            continue;
3830
3831        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3832        for( i=0; i<nAtoms; ++i )
3833        {
3834            struct peer_atom * atom = atoms[i];
3835
3836            if( isPeerCandidate( tor, atom, now ) )
3837            {
3838                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3839                walk->tor = tor;
3840                walk->atom = atom;
3841                walk->score = getPeerCandidateScore( tor, atom, salt );
3842                ++walk;
3843            }
3844        }
3845    }
3846
3847    *candidateCount = walk - candidates;
3848    if( *candidateCount > 1 )
3849        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3850    return candidates;
3851}
3852
3853static void
3854initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3855{
3856    tr_peerIo * io;
3857    const time_t now = tr_time( );
3858    tr_bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3859
3860    if( atom->fromFirst == TR_PEER_FROM_PEX )
3861        /* PEX has explicit signalling for uTP support.  If an atom
3862           originally came from PEX and doesn't have the uTP flag, skip the
3863           uTP connection attempt.  Are we being optimistic here? */
3864        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3865
3866    tordbg( t, "Starting an OUTGOING%s connection with %s",
3867            utp ? " µTP" : "",
3868            tr_atomAddrStr( atom ) );
3869
3870    io = tr_peerIoNewOutgoing( mgr->session,
3871                               mgr->session->bandwidth,
3872                               &atom->addr,
3873                               atom->port,
3874                               t->tor->info.hash,
3875                               t->tor->completeness == TR_SEED,
3876                               utp );
3877
3878    if( io == NULL )
3879    {
3880        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3881                tr_atomAddrStr( atom ) );
3882        atom->flags2 |= MYFLAG_UNREACHABLE;
3883        atom->numFails++;
3884    }
3885    else
3886    {
3887        tr_handshake * handshake = tr_handshakeNew( io,
3888                                                    mgr->session->encryptionMode,
3889                                                    myHandshakeDoneCB,
3890                                                    mgr );
3891
3892        assert( tr_peerIoGetTorrentHash( io ) );
3893
3894        tr_peerIoUnref( io ); /* balanced by the initial ref
3895                                 in tr_peerIoNewOutgoing() */
3896
3897        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3898                                 handshakeCompare );
3899    }
3900
3901    atom->lastConnectionAttemptAt = now;
3902    atom->time = now;
3903}
3904
3905static void
3906initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3907{
3908#if 0
3909    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3910             tr_atomAddrStr( c->atom ),
3911             tr_torrentName( c->tor ),
3912             (int)c->atom->seedProbability,
3913             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3914             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3915#endif
3916
3917    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3918}
3919
3920static void
3921makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3922{
3923    int i, n;
3924    struct peer_candidate * candidates;
3925
3926    candidates = getPeerCandidates( mgr->session, &n );
3927
3928    for( i=0; i<n && i<max; ++i )
3929        initiateCandidateConnection( mgr, &candidates[i] );
3930
3931    tr_free( candidates );
3932}
Note: See TracBrowser for help on using the repository browser.