source: trunk/libtransmission/peer-mgr.c @ 12545

Last change on this file since 12545 was 12539, checked in by jordan, 10 years ago

(trunk libT) #4338 "improved webseed support" -- patch by alexat

  • Property svn:keywords set to Date Rev Author Id
File size: 115.4 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 12539 2011-07-10 15:24:51Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "blocklist.h"
26#include "cache.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "handshake.h"
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to cull old atoms */
45    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
46
47    /* how frequently to change which peers are choked */
48    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
49
50    /* an optimistically unchoked peer is immune from rechoking
51       for this many calls to rechokeUploads(). */
52    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
53
54    /* how frequently to reallocate bandwidth */
55    BANDWIDTH_PERIOD_MSEC = 500,
56
57    /* how frequently to age out old piece request lists */
58    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
59
60    /* how frequently to decide which peers live and die */
61    RECONNECT_PERIOD_MSEC = 500,
62
63    /* when many peers are available, keep idle ones this long */
64    MIN_UPLOAD_IDLE_SECS = ( 60 ),
65
66    /* when few peers are available, keep idle ones this long */
67    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
68
69    /* max number of peers to ask for per second overall.
70     * this throttle is to avoid overloading the router */
71    MAX_CONNECTIONS_PER_SECOND = 12,
72
73    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
74
75    /* number of bad pieces a peer is allowed to send before we ban them */
76    MAX_BAD_PIECES_PER_PEER = 5,
77
78    /* amount of time to keep a list of request pieces lying around
79       before it's considered too old and needs to be rebuilt */
80    PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82    /* use for bitwise operations w/peer_atom.flags2 */
83    MYFLAG_BANNED = 1,
84
85    /* use for bitwise operations w/peer_atom.flags2 */
86    /* unreachable for now... but not banned.
87     * if they try to connect to us it's okay */
88    MYFLAG_UNREACHABLE = 2,
89
90    /* the minimum we'll wait before attempting to reconnect to a peer */
91    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
92
93    /** how long we'll let requests we've made linger before we cancel them */
94    REQUEST_TTL_SECS = 120,
95
96    NO_BLOCKS_CANCEL_HISTORY = 120,
97
98    CANCEL_HISTORY_SEC = 60
99};
100
101const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
102
103/**
104***
105**/
106
107/**
108 * Peer information that should be kept even before we've connected and
109 * after we've disconnected. These are kept in a pool of peer_atoms to decide
110 * which ones would make good candidates for connecting to, and to watch out
111 * for banned peers.
112 *
113 * @see tr_peer
114 * @see tr_peermsgs
115 */
116struct peer_atom
117{
118    uint8_t     fromFirst;          /* where the peer was first found */
119    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
120    uint8_t     flags;              /* these match the added_f flags */
121    uint8_t     flags2;             /* flags that aren't defined in added_f */
122    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
123    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
124
125    tr_port     port;
126    bool        utp_failed;         /* We recently failed to connect over uTP */
127    uint16_t    numFails;
128    time_t      time;               /* when the peer's connection status last changed */
129    time_t      piece_data_time;
130
131    time_t      lastConnectionAttemptAt;
132    time_t      lastConnectionAt;
133
134    /* similar to a TTL field, but less rigid --
135     * if the swarm is small, the atom will be kept past this date. */
136    time_t      shelf_date;
137    tr_peer   * peer;               /* will be NULL if not connected */
138    tr_address  addr;
139};
140
141#ifdef NDEBUG
142#define tr_isAtom(a) (TRUE)
143#else
144static bool
145tr_isAtom( const struct peer_atom * atom )
146{
147    return ( atom != NULL )
148        && ( atom->fromFirst < TR_PEER_FROM__MAX )
149        && ( atom->fromBest < TR_PEER_FROM__MAX )
150        && ( tr_address_is_valid( &atom->addr ) );
151}
152#endif
153
154static const char*
155tr_atomAddrStr( const struct peer_atom * atom )
156{
157    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
158}
159
160struct block_request
161{
162    tr_block_index_t block;
163    tr_peer * peer;
164    time_t sentAt;
165};
166
167struct weighted_piece
168{
169    tr_piece_index_t index;
170    int16_t salt;
171    int16_t requestCount;
172};
173
174enum piece_sort_state
175{
176    PIECES_UNSORTED,
177    PIECES_SORTED_BY_INDEX,
178    PIECES_SORTED_BY_WEIGHT
179};
180
181/** @brief Opaque, per-torrent data structure for peer connection information */
182typedef struct tr_torrent_peers
183{
184    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
185    tr_ptrArray                pool; /* struct peer_atom */
186    tr_ptrArray                peers; /* tr_peer */
187    tr_ptrArray                webseeds; /* tr_webseed */
188
189    tr_torrent               * tor;
190    struct tr_peerMgr        * manager;
191
192    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
193    int                        optimisticUnchokeTimeScaler;
194
195    bool                       isRunning;
196    bool                       needsCompletenessCheck;
197
198    struct block_request     * requests;
199    int                        requestCount;
200    int                        requestAlloc;
201
202    struct weighted_piece    * pieces;
203    int                        pieceCount;
204    enum piece_sort_state      pieceSortState;
205
206    /* An array of pieceCount items stating how many peers have each piece.
207       This is used to help us for downloading pieces "rarest first."
208       This may be NULL if we don't have metainfo yet, or if we're not
209       downloading and don't care about rarity */
210    uint16_t                 * pieceReplication;
211    size_t                     pieceReplicationSize;
212
213    int                        interestedCount;
214    int                        maxPeers;
215    time_t                     lastCancel;
216
217    /* Before the endgame this should be 0. In endgame, is contains the average
218     * number of pending requests per peer. Only peers which have more pending
219     * requests are considered 'fast' are allowed to request a block that's
220     * already been requested from another (slower?) peer. */
221    int                        endgame;
222}
223Torrent;
224
225struct tr_peerMgr
226{
227    tr_session    * session;
228    tr_ptrArray     incomingHandshakes; /* tr_handshake */
229    struct event  * bandwidthTimer;
230    struct event  * rechokeTimer;
231    struct event  * refillUpkeepTimer;
232    struct event  * atomTimer;
233};
234
235#define tordbg( t, ... ) \
236    do { \
237        if( tr_deepLoggingIsActive( ) ) \
238            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
239    } while( 0 )
240
241#define dbgmsg( ... ) \
242    do { \
243        if( tr_deepLoggingIsActive( ) ) \
244            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
245    } while( 0 )
246
247/**
248***
249**/
250
251static inline void
252managerLock( const struct tr_peerMgr * manager )
253{
254    tr_sessionLock( manager->session );
255}
256
257static inline void
258managerUnlock( const struct tr_peerMgr * manager )
259{
260    tr_sessionUnlock( manager->session );
261}
262
263static inline void
264torrentLock( Torrent * torrent )
265{
266    managerLock( torrent->manager );
267}
268
269static inline void
270torrentUnlock( Torrent * torrent )
271{
272    managerUnlock( torrent->manager );
273}
274
275static inline int
276torrentIsLocked( const Torrent * t )
277{
278    return tr_sessionIsLocked( t->manager->session );
279}
280
281/**
282***
283**/
284
285static int
286handshakeCompareToAddr( const void * va, const void * vb )
287{
288    const tr_handshake * a = va;
289
290    return tr_address_compare( tr_handshakeGetAddr( a, NULL ), vb );
291}
292
293static int
294handshakeCompare( const void * a, const void * b )
295{
296    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
297}
298
299static inline tr_handshake*
300getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
301{
302    if( tr_ptrArrayEmpty( handshakes ) )
303        return NULL;
304
305    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
306}
307
308static int
309comparePeerAtomToAddress( const void * va, const void * vb )
310{
311    const struct peer_atom * a = va;
312
313    return tr_address_compare( &a->addr, vb );
314}
315
316static int
317compareAtomsByAddress( const void * va, const void * vb )
318{
319    const struct peer_atom * b = vb;
320
321    assert( tr_isAtom( b ) );
322
323    return comparePeerAtomToAddress( va, &b->addr );
324}
325
326/**
327***
328**/
329
330const tr_address *
331tr_peerAddress( const tr_peer * peer )
332{
333    return &peer->atom->addr;
334}
335
336static Torrent*
337getExistingTorrent( tr_peerMgr *    manager,
338                    const uint8_t * hash )
339{
340    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
341
342    return tor == NULL ? NULL : tor->torrentPeers;
343}
344
345static int
346peerCompare( const void * a, const void * b )
347{
348    return tr_address_compare( tr_peerAddress( a ), tr_peerAddress( b ) );
349}
350
351static struct peer_atom*
352getExistingAtom( const Torrent    * t,
353                 const tr_address * addr )
354{
355    Torrent * tt = (Torrent*)t;
356    assert( torrentIsLocked( t ) );
357    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
358}
359
360static bool
361peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
362{
363    Torrent * t = (Torrent*) ct;
364
365    assert( torrentIsLocked ( t ) );
366
367    return ( atom->peer != NULL )
368        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
369        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
370}
371
372void
373tr_peerConstruct( tr_peer * peer )
374{
375    memset( peer, 0, sizeof( tr_peer ) );
376
377    peer->have = TR_BITFIELD_INIT;
378}
379
380static tr_peer*
381peerNew( struct peer_atom * atom )
382{
383    tr_peer * peer = tr_new( tr_peer, 1 );
384    tr_peerConstruct( peer );
385
386    peer->atom = atom;
387    atom->peer = peer;
388
389    return peer;
390}
391
392static tr_peer*
393getPeer( Torrent * torrent, struct peer_atom * atom )
394{
395    tr_peer * peer;
396
397    assert( torrentIsLocked( torrent ) );
398
399    peer = atom->peer;
400
401    if( peer == NULL )
402    {
403        peer = peerNew( atom );
404        tr_bitfieldConstruct( &peer->have, torrent->tor->blockCount );
405        tr_bitfieldConstruct( &peer->blame, torrent->tor->blockCount );
406        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
407    }
408
409    return peer;
410}
411
412static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
413
414void
415tr_peerDestruct( tr_torrent * tor, tr_peer * peer )
416{
417    assert( peer != NULL );
418
419    peerDeclinedAllRequests( tor->torrentPeers, peer );
420
421    if( peer->msgs != NULL )
422        tr_peerMsgsFree( peer->msgs );
423
424    if( peer->io ) {
425        tr_peerIoClear( peer->io );
426        tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
427    }
428
429    tr_bitfieldDestruct( &peer->have );
430    tr_bitfieldDestruct( &peer->blame );
431    tr_free( peer->client );
432
433    if( peer->atom )
434        peer->atom->peer = NULL;
435}
436
437static void
438peerDelete( Torrent * t, tr_peer * peer )
439{
440    tr_peerDestruct( t->tor, peer );
441    tr_free( peer );
442}
443
444static bool
445replicationExists( const Torrent * t )
446{
447    return t->pieceReplication != NULL;
448}
449
450static void
451replicationFree( Torrent * t )
452{
453    tr_free( t->pieceReplication );
454    t->pieceReplication = NULL;
455    t->pieceReplicationSize = 0;
456}
457
458static void
459replicationNew( Torrent * t )
460{
461    tr_piece_index_t piece_i;
462    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
463    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
464    const int peer_count = tr_ptrArraySize( &t->peers );
465
466    assert( !replicationExists( t ) );
467
468    t->pieceReplicationSize = piece_count;
469    t->pieceReplication = tr_new0( uint16_t, piece_count );
470
471    for( piece_i=0; piece_i<piece_count; ++piece_i )
472    {
473        int peer_i;
474        uint16_t r = 0;
475
476        for( peer_i=0; peer_i<peer_count; ++peer_i )
477            if( tr_bitfieldHas( &peers[peer_i]->have, piece_i ) )
478                ++r;
479
480        t->pieceReplication[piece_i] = r;
481    }
482}
483
484static void
485torrentFree( void * vt )
486{
487    Torrent * t = vt;
488
489    assert( t );
490    assert( !t->isRunning );
491    assert( torrentIsLocked( t ) );
492    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
493    assert( tr_ptrArrayEmpty( &t->peers ) );
494
495    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
496    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
497    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
498    tr_ptrArrayDestruct( &t->peers, NULL );
499
500    replicationFree( t );
501
502    tr_free( t->requests );
503    tr_free( t->pieces );
504    tr_free( t );
505}
506
507static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
508
509static Torrent*
510torrentNew( tr_peerMgr * manager, tr_torrent * tor )
511{
512    int       i;
513    Torrent * t;
514
515    t = tr_new0( Torrent, 1 );
516    t->manager = manager;
517    t->tor = tor;
518    t->pool = TR_PTR_ARRAY_INIT;
519    t->peers = TR_PTR_ARRAY_INIT;
520    t->webseeds = TR_PTR_ARRAY_INIT;
521    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
522
523    for( i = 0; i < tor->info.webseedCount; ++i )
524    {
525        tr_webseed * w =
526            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
527        tr_ptrArrayAppend( &t->webseeds, w );
528    }
529
530    return t;
531}
532
533tr_peerMgr*
534tr_peerMgrNew( tr_session * session )
535{
536    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
537    m->session = session;
538    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
539    return m;
540}
541
542static void
543deleteTimer( struct event ** t )
544{
545    if( *t != NULL )
546    {
547        event_free( *t );
548        *t = NULL;
549    }
550}
551
552static void
553deleteTimers( struct tr_peerMgr * m )
554{
555    deleteTimer( &m->atomTimer );
556    deleteTimer( &m->bandwidthTimer );
557    deleteTimer( &m->rechokeTimer );
558    deleteTimer( &m->refillUpkeepTimer );
559}
560
561void
562tr_peerMgrFree( tr_peerMgr * manager )
563{
564    managerLock( manager );
565
566    deleteTimers( manager );
567
568    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
569     * the item from manager->handshakes, so this is a little roundabout... */
570    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
571        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
572
573    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
574
575    managerUnlock( manager );
576    tr_free( manager );
577}
578
579static int
580clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
581{
582    if( !tr_torrentHasMetadata( tor ) )
583        return true;
584
585    return peer->clientIsInterested && !peer->clientIsChoked;
586}
587
588static int
589clientIsUploadingTo( const tr_peer * peer )
590{
591    return peer->peerIsInterested && !peer->peerIsChoked;
592}
593
594/***
595****
596***/
597
598void
599tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
600{
601    tr_torrent * tor = NULL;
602    tr_session * session = mgr->session;
603
604    /* we cache whether or not a peer is blocklisted...
605       since the blocklist has changed, erase that cached value */
606    while(( tor = tr_torrentNext( session, tor )))
607    {
608        int i;
609        Torrent * t = tor->torrentPeers;
610        const int n = tr_ptrArraySize( &t->pool );
611        for( i=0; i<n; ++i ) {
612            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
613            atom->blocklisted = -1;
614        }
615    }
616}
617
618static bool
619isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
620{
621    if( atom->blocklisted < 0 )
622        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
623
624    assert( tr_isBool( atom->blocklisted ) );
625    return atom->blocklisted;
626}
627
628
629/***
630****
631***/
632
633static void
634atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
635{
636    assert( atom != NULL );
637    assert( -1<=seedProbability && seedProbability<=100 );
638
639    atom->seedProbability = seedProbability;
640
641    if( seedProbability == 100 )
642        atom->flags |= ADDED_F_SEED_FLAG;
643    else if( seedProbability != -1 )
644        atom->flags &= ~ADDED_F_SEED_FLAG;
645}
646
647static inline bool
648atomIsSeed( const struct peer_atom * atom )
649{
650    return atom->seedProbability == 100;
651}
652
653static void
654atomSetSeed( const Torrent * t, struct peer_atom * atom )
655{
656    if( !atomIsSeed( atom ) )
657    {
658        tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
659
660        atomSetSeedProbability( atom, 100 );
661    }
662}
663
664
665bool
666tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
667                      const tr_address  * addr )
668{
669    bool isSeed = false;
670    const Torrent * t = tor->torrentPeers;
671    const struct peer_atom * atom = getExistingAtom( t, addr );
672
673    if( atom )
674        isSeed = atomIsSeed( atom );
675
676    return isSeed;
677}
678
679void
680tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
681{
682    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
683
684    if( atom )
685        atom->flags |= ADDED_F_UTP_FLAGS;
686}
687
688void
689tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, bool failed )
690{
691    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
692
693    if( atom )
694        atom->utp_failed = failed;
695}
696
697
698/**
699***  REQUESTS
700***
701*** There are two data structures associated with managing block requests:
702***
703*** 1. Torrent::requests, an array of "struct block_request" which keeps
704***    track of which blocks have been requested, and when, and by which peers.
705***    This is list is used for (a) cancelling requests that have been pending
706***    for too long and (b) avoiding duplicate requests before endgame.
707***
708*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
709***    pieces that we want to request. It's used to decide which blocks to
710***    return next when tr_peerMgrGetBlockRequests() is called.
711**/
712
713/**
714*** struct block_request
715**/
716
717static int
718compareReqByBlock( const void * va, const void * vb )
719{
720    const struct block_request * a = va;
721    const struct block_request * b = vb;
722
723    /* primary key: block */
724    if( a->block < b->block ) return -1;
725    if( a->block > b->block ) return 1;
726
727    /* secondary key: peer */
728    if( a->peer < b->peer ) return -1;
729    if( a->peer > b->peer ) return 1;
730
731    return 0;
732}
733
734static void
735requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
736{
737    struct block_request key;
738
739    /* ensure enough room is available... */
740    if( t->requestCount + 1 >= t->requestAlloc )
741    {
742        const int CHUNK_SIZE = 128;
743        t->requestAlloc += CHUNK_SIZE;
744        t->requests = tr_renew( struct block_request,
745                                t->requests, t->requestAlloc );
746    }
747
748    /* populate the record we're inserting */
749    key.block = block;
750    key.peer = peer;
751    key.sentAt = tr_time( );
752
753    /* insert the request to our array... */
754    {
755        bool exact;
756        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
757                                       sizeof( struct block_request ),
758                                       compareReqByBlock, &exact );
759        assert( !exact );
760        memmove( t->requests + pos + 1,
761                 t->requests + pos,
762                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
763        t->requests[pos] = key;
764    }
765
766    if( peer != NULL )
767    {
768        ++peer->pendingReqsToPeer;
769        assert( peer->pendingReqsToPeer >= 0 );
770    }
771
772    /*fprintf( stderr, "added request of block %lu from peer %s... "
773                       "there are now %d block\n",
774                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
775}
776
777static struct block_request *
778requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
779{
780    struct block_request key;
781    key.block = block;
782    key.peer = (tr_peer*) peer;
783
784    return bsearch( &key, t->requests, t->requestCount,
785                    sizeof( struct block_request ),
786                    compareReqByBlock );
787}
788
789/**
790 * Find the peers are we currently requesting the block
791 * with index @a block from and append them to @a peerArr.
792 */
793static void
794getBlockRequestPeers( Torrent * t, tr_block_index_t block,
795                      tr_ptrArray * peerArr )
796{
797    bool exact;
798    int i, pos;
799    struct block_request key;
800
801    key.block = block;
802    key.peer = NULL;
803    pos = tr_lowerBound( &key, t->requests, t->requestCount,
804                         sizeof( struct block_request ),
805                         compareReqByBlock, &exact );
806
807    assert( !exact ); /* shouldn't have a request with .peer == NULL */
808
809    for( i = pos; i < t->requestCount; ++i )
810    {
811        if( t->requests[i].block != block )
812            break;
813        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
814    }
815}
816
817static void
818decrementPendingReqCount( const struct block_request * b )
819{
820    if( b->peer != NULL )
821        if( b->peer->pendingReqsToPeer > 0 )
822            --b->peer->pendingReqsToPeer;
823}
824
825static void
826requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
827{
828    const struct block_request * b = requestListLookup( t, block, peer );
829    if( b != NULL )
830    {
831        const int pos = b - t->requests;
832        assert( pos < t->requestCount );
833
834        decrementPendingReqCount( b );
835
836        tr_removeElementFromArray( t->requests,
837                                   pos,
838                                   sizeof( struct block_request ),
839                                   t->requestCount-- );
840
841        /*fprintf( stderr, "removing request of block %lu from peer %s... "
842                           "there are now %d block requests left\n",
843                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
844    }
845}
846
847static int
848countActiveWebseeds( const Torrent * t )
849{
850    int activeCount = 0;
851    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
852    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
853
854    for( ; w!=wend; ++w )
855        if( tr_webseedIsActive( *w ) )
856            ++activeCount;
857
858    return activeCount;
859}
860
861static bool
862testForEndgame( const Torrent * t )
863{
864    /* we consider ourselves to be in endgame if the number of bytes
865       we've got requested is >= the number of bytes left to download */
866    return ( t->requestCount * t->tor->blockSize )
867               >= tr_cpLeftUntilDone( &t->tor->completion );
868}
869
870static void
871updateEndgame( Torrent * t )
872{
873    assert( t->requestCount >= 0 );
874
875    if( !testForEndgame( t ) )
876    {
877        /* not in endgame */
878        t->endgame = 0;
879    }
880    else if( !t->endgame ) /* only recalculate when endgame first begins */
881    {
882        int numDownloading = 0;
883        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
884        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
885
886        /* add the active bittorrent peers... */
887        for( ; p!=pend; ++p )
888            if( (*p)->pendingReqsToPeer > 0 )
889                ++numDownloading;
890
891        /* add the active webseeds... */
892        numDownloading += countActiveWebseeds( t );
893
894        /* average number of pending requests per downloading peer */
895        t->endgame = t->requestCount / MAX( numDownloading, 1 );
896    }
897}
898
899
900/****
901*****
902*****  Piece List Manipulation / Accessors
903*****
904****/
905
906static inline void
907invalidatePieceSorting( Torrent * t )
908{
909    t->pieceSortState = PIECES_UNSORTED;
910}
911
912static const tr_torrent * weightTorrent;
913
914static const uint16_t * weightReplication;
915
916static void
917setComparePieceByWeightTorrent( Torrent * t )
918{
919    if( !replicationExists( t ) )
920        replicationNew( t );
921
922    weightTorrent = t->tor;
923    weightReplication = t->pieceReplication;
924}
925
926/* we try to create a "weight" s.t. high-priority pieces come before others,
927 * and that partially-complete pieces come before empty ones. */
928static int
929comparePieceByWeight( const void * va, const void * vb )
930{
931    const struct weighted_piece * a = va;
932    const struct weighted_piece * b = vb;
933    int ia, ib, missing, pending;
934    const tr_torrent * tor = weightTorrent;
935    const uint16_t * rep = weightReplication;
936
937    /* primary key: weight */
938    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
939    pending = a->requestCount;
940    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
941    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
942    pending = b->requestCount;
943    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
944    if( ia < ib ) return -1;
945    if( ia > ib ) return 1;
946
947    /* secondary key: higher priorities go first */
948    ia = tor->info.pieces[a->index].priority;
949    ib = tor->info.pieces[b->index].priority;
950    if( ia > ib ) return -1;
951    if( ia < ib ) return 1;
952
953    /* tertiary key: rarest first. */
954    ia = rep[a->index];
955    ib = rep[b->index];
956    if( ia < ib ) return -1;
957    if( ia > ib ) return 1;
958
959    /* quaternary key: random */
960    if( a->salt < b->salt ) return -1;
961    if( a->salt > b->salt ) return 1;
962
963    /* okay, they're equal */
964    return 0;
965}
966
967static int
968comparePieceByIndex( const void * va, const void * vb )
969{
970    const struct weighted_piece * a = va;
971    const struct weighted_piece * b = vb;
972    if( a->index < b->index ) return -1;
973    if( a->index > b->index ) return 1;
974    return 0;
975}
976
977static void
978pieceListSort( Torrent * t, enum piece_sort_state state )
979{
980    assert( state==PIECES_SORTED_BY_INDEX
981         || state==PIECES_SORTED_BY_WEIGHT );
982
983
984    if( state == PIECES_SORTED_BY_WEIGHT )
985    {
986        setComparePieceByWeightTorrent( t );
987        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
988    }
989    else
990        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
991
992    t->pieceSortState = state;
993}
994
995/**
996 * These functions are useful for testing, but too expensive for nightly builds.
997 * let's leave it disabled but add an easy hook to compile it back in
998 */
999#if 1
1000#define assertWeightedPiecesAreSorted(t)
1001#define assertReplicationCountIsExact(t)
1002#else
1003static void
1004assertWeightedPiecesAreSorted( Torrent * t )
1005{
1006    if( !t->endgame )
1007    {
1008        int i;
1009        setComparePieceByWeightTorrent( t );
1010        for( i=0; i<t->pieceCount-1; ++i )
1011            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1012    }
1013}
1014static void
1015assertReplicationCountIsExact( Torrent * t )
1016{
1017    /* This assert might fail due to errors of implementations in other
1018     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1019     * from a client. If a such a behavior is noticed,
1020     * a bug report should be filled to the faulty client. */
1021
1022    size_t piece_i;
1023    const uint16_t * rep = t->pieceReplication;
1024    const size_t piece_count = t->pieceReplicationSize;
1025    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1026    const int peer_count = tr_ptrArraySize( &t->peers );
1027
1028    assert( piece_count == t->tor->info.pieceCount );
1029
1030    for( piece_i=0; piece_i<piece_count; ++piece_i )
1031    {
1032        int peer_i;
1033        uint16_t r = 0;
1034
1035        for( peer_i=0; peer_i<peer_count; ++peer_i )
1036            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
1037                ++r;
1038
1039        assert( rep[piece_i] == r );
1040    }
1041}
1042#endif
1043
1044static struct weighted_piece *
1045pieceListLookup( Torrent * t, tr_piece_index_t index )
1046{
1047    int i;
1048
1049    for( i=0; i<t->pieceCount; ++i )
1050        if( t->pieces[i].index == index )
1051            return &t->pieces[i];
1052
1053    return NULL;
1054}
1055
1056static void
1057pieceListRebuild( Torrent * t )
1058{
1059
1060    if( !tr_torrentIsSeed( t->tor ) )
1061    {
1062        tr_piece_index_t i;
1063        tr_piece_index_t * pool;
1064        tr_piece_index_t poolCount = 0;
1065        const tr_torrent * tor = t->tor;
1066        const tr_info * inf = tr_torrentInfo( tor );
1067        struct weighted_piece * pieces;
1068        int pieceCount;
1069
1070        /* build the new list */
1071        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1072        for( i=0; i<inf->pieceCount; ++i )
1073            if( !inf->pieces[i].dnd )
1074                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1075                    pool[poolCount++] = i;
1076        pieceCount = poolCount;
1077        pieces = tr_new0( struct weighted_piece, pieceCount );
1078        for( i=0; i<poolCount; ++i ) {
1079            struct weighted_piece * piece = pieces + i;
1080            piece->index = pool[i];
1081            piece->requestCount = 0;
1082            piece->salt = tr_cryptoWeakRandInt( 4096 );
1083        }
1084
1085        /* if we already had a list of pieces, merge it into
1086         * the new list so we don't lose its requestCounts */
1087        if( t->pieces != NULL )
1088        {
1089            struct weighted_piece * o = t->pieces;
1090            struct weighted_piece * oend = o + t->pieceCount;
1091            struct weighted_piece * n = pieces;
1092            struct weighted_piece * nend = n + pieceCount;
1093
1094            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1095
1096            while( o!=oend && n!=nend ) {
1097                if( o->index < n->index )
1098                    ++o;
1099                else if( o->index > n->index )
1100                    ++n;
1101                else
1102                    *n++ = *o++;
1103            }
1104
1105            tr_free( t->pieces );
1106        }
1107
1108        t->pieces = pieces;
1109        t->pieceCount = pieceCount;
1110
1111        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1112
1113        /* cleanup */
1114        tr_free( pool );
1115    }
1116}
1117
1118static void
1119pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1120{
1121    struct weighted_piece * p;
1122
1123    if(( p = pieceListLookup( t, piece )))
1124    {
1125        const int pos = p - t->pieces;
1126
1127        tr_removeElementFromArray( t->pieces,
1128                                   pos,
1129                                   sizeof( struct weighted_piece ),
1130                                   t->pieceCount-- );
1131
1132        if( t->pieceCount == 0 )
1133        {
1134            tr_free( t->pieces );
1135            t->pieces = NULL;
1136        }
1137    }
1138}
1139
1140static void
1141pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1142{
1143    int pos;
1144    bool isSorted = true;
1145
1146    if( p == NULL )
1147        return;
1148
1149    /* is the torrent already sorted? */
1150    pos = p - t->pieces;
1151    setComparePieceByWeightTorrent( t );
1152    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1153        isSorted = false;
1154    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1155        isSorted = false;
1156
1157    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1158    {
1159       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1160       isSorted = true;
1161    }
1162
1163    /* if it's not sorted, move it around */
1164    if( !isSorted )
1165    {
1166        bool exact;
1167        const struct weighted_piece tmp = *p;
1168
1169        tr_removeElementFromArray( t->pieces,
1170                                   pos,
1171                                   sizeof( struct weighted_piece ),
1172                                   t->pieceCount-- );
1173
1174        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1175                             sizeof( struct weighted_piece ),
1176                             comparePieceByWeight, &exact );
1177
1178        memmove( &t->pieces[pos + 1],
1179                 &t->pieces[pos],
1180                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1181
1182        t->pieces[pos] = tmp;
1183    }
1184
1185    assertWeightedPiecesAreSorted( t );
1186}
1187
1188static void
1189pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1190{
1191    struct weighted_piece * p;
1192    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1193
1194    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1195    {
1196        --p->requestCount;
1197        pieceListResortPiece( t, p );
1198    }
1199}
1200
1201
1202/****
1203*****
1204*****  Replication count ( for rarest first policy )
1205*****
1206****/
1207
1208/**
1209 * Increase the replication count of this piece and sort it if the
1210 * piece list is already sorted
1211 */
1212static void
1213tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1214{
1215    assert( replicationExists( t ) );
1216    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1217
1218    /* One more replication of this piece is present in the swarm */
1219    ++t->pieceReplication[index];
1220
1221    /* we only resort the piece if the list is already sorted */
1222    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1223        pieceListResortPiece( t, pieceListLookup( t, index ) );
1224}
1225
1226/**
1227 * Increases the replication count of pieces present in the bitfield
1228 */
1229static void
1230tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1231{
1232    size_t i;
1233    uint16_t * rep = t->pieceReplication;
1234    const size_t n = t->tor->info.pieceCount;
1235
1236    assert( replicationExists( t ) );
1237
1238    for( i=0; i<n; ++i )
1239        if( tr_bitfieldHas( b, i ) )
1240            ++rep[i];
1241
1242    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1243        invalidatePieceSorting( t );
1244}
1245
1246/**
1247 * Increase the replication count of every piece
1248 */
1249static void
1250tr_incrReplication( Torrent * t )
1251{
1252    int i;
1253    const int n = t->pieceReplicationSize;
1254
1255    assert( replicationExists( t ) );
1256    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1257
1258    for( i=0; i<n; ++i )
1259        ++t->pieceReplication[i];
1260}
1261
1262/**
1263 * Decrease the replication count of pieces present in the bitset.
1264 */
1265static void
1266tr_decrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1267{
1268    int i;
1269    const int n = t->pieceReplicationSize;
1270
1271    assert( replicationExists( t ) );
1272    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1273
1274    if( tr_bitfieldHasAll( b ) )
1275    {
1276        for( i=0; i<n; ++i )
1277            --t->pieceReplication[i];
1278    }
1279    else if ( !tr_bitfieldHasNone( b ) )
1280    {
1281        for( i=0; i<n; ++i )
1282            if( tr_bitfieldHas( b, i ) )
1283                --t->pieceReplication[i];
1284
1285        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1286            invalidatePieceSorting( t );
1287    }
1288}
1289
1290/**
1291***
1292**/
1293
1294void
1295tr_peerMgrRebuildRequests( tr_torrent * tor )
1296{
1297    assert( tr_isTorrent( tor ) );
1298
1299    pieceListRebuild( tor->torrentPeers );
1300}
1301
1302void
1303tr_peerMgrGetNextRequests( tr_torrent           * tor,
1304                           tr_peer              * peer,
1305                           int                    numwant,
1306                           tr_block_index_t     * setme,
1307                           int                  * numgot,
1308                           bool                   get_intervals )
1309{
1310    int i;
1311    int got;
1312    Torrent * t;
1313    struct weighted_piece * pieces;
1314    const tr_bitfield * const have = &peer->have;
1315
1316    /* sanity clause */
1317    assert( tr_isTorrent( tor ) );
1318    assert( peer->clientIsInterested );
1319    assert( !peer->clientIsChoked );
1320    assert( numwant > 0 );
1321
1322    /* walk through the pieces and find blocks that should be requested */
1323    got = 0;
1324    t = tor->torrentPeers;
1325
1326    /* prep the pieces list */
1327    if( t->pieces == NULL )
1328        pieceListRebuild( t );
1329
1330    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1331        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1332
1333    assertReplicationCountIsExact( t );
1334    assertWeightedPiecesAreSorted( t );
1335
1336    updateEndgame( t );
1337    pieces = t->pieces;
1338    for( i=0; i<t->pieceCount && got<numwant; ++i )
1339    {
1340        struct weighted_piece * p = pieces + i;
1341
1342        /* if the peer has this piece that we want... */
1343        if( tr_bitfieldHas( have, p->index ) )
1344        {
1345            tr_block_index_t b;
1346            tr_block_index_t first;
1347            tr_block_index_t last;
1348            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1349
1350            tr_torGetPieceBlockRange( tor, p->index, &first, &last );
1351
1352            for( b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b )
1353            {
1354                int peerCount;
1355                tr_peer ** peers;
1356
1357                /* don't request blocks we've already got */
1358                if( tr_cpBlockIsComplete( &tor->completion, b ) )
1359                    continue;
1360
1361                /* always add peer if this block has no peers yet */
1362                tr_ptrArrayClear( &peerArr );
1363                getBlockRequestPeers( t, b, &peerArr );
1364                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1365                if( peerCount != 0 )
1366                {
1367                    /* don't make a second block request until the endgame */
1368                    if( !t->endgame )
1369                        continue;
1370
1371                    /* don't have more than two peers requesting this block */
1372                    if( peerCount > 1 )
1373                        continue;
1374
1375                    /* don't send the same request to the same peer twice */
1376                    if( peer == peers[0] )
1377                        continue;
1378
1379                    /* in the endgame allow an additional peer to download a
1380                       block but only if the peer seems to be handling requests
1381                       relatively fast */
1382                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1383                        continue;
1384                }
1385
1386                /* update the caller's table */
1387                if( !get_intervals ) {
1388                    setme[got++] = b;
1389                }
1390                /* if intervals are requested two array entries are necessarry:
1391                   one for the interval's starting block and one for its end block */
1392                else if( got && setme[2 * got - 1] == b - 1 && b != first ) {
1393                    /* expand the last interval */
1394                    ++setme[2 * got - 1];
1395                }
1396                else {
1397                    /* begin a new interval */
1398                    setme[2 * got] = setme[2 * got + 1] = b;
1399                    ++got;
1400                }
1401
1402                /* update our own tables */
1403                requestListAdd( t, b, peer );
1404                ++p->requestCount;
1405            }
1406
1407            tr_ptrArrayDestruct( &peerArr, NULL );
1408        }
1409    }
1410
1411    /* In most cases we've just changed the weights of a small number of pieces.
1412     * So rather than qsort()ing the entire array, it's faster to apply an
1413     * adaptive insertion sort algorithm. */
1414    if( got > 0 )
1415    {
1416        /* not enough requests || last piece modified */
1417        if ( i == t->pieceCount ) --i;
1418
1419        setComparePieceByWeightTorrent( t );
1420        while( --i >= 0 )
1421        {
1422            bool exact;
1423
1424            /* relative position! */
1425            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1426                                              t->pieceCount - (i + 1),
1427                                              sizeof( struct weighted_piece ),
1428                                              comparePieceByWeight, &exact );
1429            if( newpos > 0 )
1430            {
1431                const struct weighted_piece piece = t->pieces[i];
1432                memmove( &t->pieces[i],
1433                         &t->pieces[i + 1],
1434                         sizeof( struct weighted_piece ) * ( newpos ) );
1435                t->pieces[i + newpos] = piece;
1436            }
1437        }
1438    }
1439
1440    assertWeightedPiecesAreSorted( t );
1441    *numgot = got;
1442}
1443
1444bool
1445tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1446                          const tr_peer     * peer,
1447                          tr_block_index_t    block )
1448{
1449    const Torrent * t = tor->torrentPeers;
1450    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1451}
1452
1453/* cancel requests that are too old */
1454static void
1455refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1456{
1457    time_t now;
1458    time_t too_old;
1459    tr_torrent * tor;
1460    int cancel_buflen = 0;
1461    struct block_request * cancel = NULL;
1462    tr_peerMgr * mgr = vmgr;
1463    managerLock( mgr );
1464
1465    now = tr_time( );
1466    too_old = now - REQUEST_TTL_SECS;
1467
1468    /* alloc the temporary "cancel" buffer */
1469    tor = NULL;
1470    while(( tor = tr_torrentNext( mgr->session, tor )))
1471        cancel_buflen = MAX( cancel_buflen, tor->torrentPeers->requestCount );
1472    if( cancel_buflen > 0 )
1473        cancel = tr_new( struct block_request, cancel_buflen );
1474
1475    /* prune requests that are too old */
1476    tor = NULL;
1477    while(( tor = tr_torrentNext( mgr->session, tor )))
1478    {
1479        Torrent * t = tor->torrentPeers;
1480        const int n = t->requestCount;
1481        if( n > 0 )
1482        {
1483            int keepCount = 0;
1484            int cancelCount = 0;
1485            const struct block_request * it;
1486            const struct block_request * end;
1487
1488            for( it=t->requests, end=it+n; it!=end; ++it )
1489            {
1490                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1491                    cancel[cancelCount++] = *it;
1492                else
1493                {
1494                    if( it != &t->requests[keepCount] )
1495                        t->requests[keepCount] = *it;
1496                    keepCount++;
1497                }
1498            }
1499
1500            /* prune out the ones we aren't keeping */
1501            t->requestCount = keepCount;
1502
1503            /* send cancel messages for all the "cancel" ones */
1504            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1505                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1506                    tr_historyAdd( &it->peer->cancelsSentToPeer, now, 1 );
1507                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1508                    decrementPendingReqCount( it );
1509                }
1510            }
1511
1512            /* decrement the pending request counts for the timed-out blocks */
1513            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1514                pieceListRemoveRequest( t, it->block );
1515        }
1516    }
1517
1518    tr_free( cancel );
1519    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1520    managerUnlock( mgr );
1521}
1522
1523static void
1524addStrike( Torrent * t, tr_peer * peer )
1525{
1526    tordbg( t, "increasing peer %s strike count to %d",
1527            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1528
1529    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1530    {
1531        struct peer_atom * atom = peer->atom;
1532        atom->flags2 |= MYFLAG_BANNED;
1533        peer->doPurge = 1;
1534        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1535    }
1536}
1537
1538static void
1539gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1540{
1541    tr_torrent *   tor = t->tor;
1542    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1543
1544    tor->corruptCur += byteCount;
1545    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1546
1547    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1548}
1549
1550static void
1551peerSuggestedPiece( Torrent            * t UNUSED,
1552                    tr_peer            * peer UNUSED,
1553                    tr_piece_index_t     pieceIndex UNUSED,
1554                    int                  isFastAllowed UNUSED )
1555{
1556#if 0
1557    assert( t );
1558    assert( peer );
1559    assert( peer->msgs );
1560
1561    /* is this a valid piece? */
1562    if(  pieceIndex >= t->tor->info.pieceCount )
1563        return;
1564
1565    /* don't ask for it if we've already got it */
1566    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1567        return;
1568
1569    /* don't ask for it if they don't have it */
1570    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1571        return;
1572
1573    /* don't ask for it if we're choked and it's not fast */
1574    if( !isFastAllowed && peer->clientIsChoked )
1575        return;
1576
1577    /* request the blocks that we don't have in this piece */
1578    {
1579        tr_block_index_t b;
1580        tr_block_index_t first;
1581        tr_block_index_t last;
1582        const tr_torrent * tor = t->tor;
1583
1584        tr_torGetPieceBlockRange( t->tor, pieceIndex, &first, &last );
1585
1586        for( b=first; b<=last; ++b )
1587        {
1588            if( !tr_cpBlockIsComplete( tor->completion, b ) )
1589            {
1590                const uint32_t offset = getBlockOffsetInPiece( tor, b );
1591                const uint32_t length = tr_torBlockCountBytes( tor, b );
1592                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1593                incrementPieceRequests( t, pieceIndex );
1594            }
1595        }
1596    }
1597#endif
1598}
1599
1600static void
1601removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1602{
1603    requestListRemove( t, block, peer );
1604    pieceListRemoveRequest( t, block );
1605}
1606
1607/* peer choked us, or maybe it disconnected.
1608   either way we need to remove all its requests */
1609static void
1610peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1611{
1612    int i, n;
1613    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1614
1615    for( i=n=0; i<t->requestCount; ++i )
1616        if( peer == t->requests[i].peer )
1617            blocks[n++] = t->requests[i].block;
1618
1619    for( i=0; i<n; ++i )
1620        removeRequestFromTables( t, blocks[i], peer );
1621
1622    tr_free( blocks );
1623}
1624
1625static void tr_peerMgrSetBlame( tr_torrent *, tr_piece_index_t, int );
1626
1627static void
1628peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1629{
1630    Torrent * t = vt;
1631
1632    torrentLock( t );
1633
1634    assert( peer != NULL );
1635
1636    switch( e->eventType )
1637    {
1638        case TR_PEER_PEER_GOT_DATA:
1639        {
1640            const time_t now = tr_time( );
1641            tr_torrent * tor = t->tor;
1642
1643            if( e->wasPieceData )
1644            {
1645                tor->uploadedCur += e->length;
1646                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1647                tr_torrentSetActivityDate( tor, now );
1648                tr_torrentSetDirty( tor );
1649            }
1650
1651            /* update the stats */
1652            if( e->wasPieceData )
1653                tr_statsAddUploaded( tor->session, e->length );
1654
1655            /* update our atom */
1656            if( peer->atom && e->wasPieceData )
1657                peer->atom->piece_data_time = now;
1658
1659            break;
1660        }
1661
1662        case TR_PEER_CLIENT_GOT_HAVE:
1663            if( replicationExists( t ) ) {
1664                tr_incrReplicationOfPiece( t, e->pieceIndex );
1665                assertReplicationCountIsExact( t );
1666            }
1667            break;
1668
1669        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1670            if( replicationExists( t ) ) {
1671                tr_incrReplication( t );
1672                assertReplicationCountIsExact( t );
1673            }
1674            break;
1675
1676        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1677            /* noop */
1678            break;
1679
1680        case TR_PEER_CLIENT_GOT_BITFIELD:
1681            assert( e->bitfield != NULL );
1682            if( replicationExists( t ) ) {
1683                tr_incrReplicationFromBitfield( t, e->bitfield );
1684                assertReplicationCountIsExact( t );
1685            }
1686            break;
1687
1688        case TR_PEER_CLIENT_GOT_REJ: {
1689            tr_block_index_t b = _tr_block( t->tor, e->pieceIndex, e->offset );
1690            if( b < t->tor->blockCount )
1691                removeRequestFromTables( t, b, peer );
1692            else
1693                tordbg( t, "Peer %s sent an out-of-range reject message",
1694                           tr_atomAddrStr( peer->atom ) );
1695            break;
1696        }
1697
1698        case TR_PEER_CLIENT_GOT_CHOKE:
1699            peerDeclinedAllRequests( t, peer );
1700            break;
1701
1702        case TR_PEER_CLIENT_GOT_PORT:
1703            if( peer->atom )
1704                peer->atom->port = e->port;
1705            break;
1706
1707        case TR_PEER_CLIENT_GOT_SUGGEST:
1708            peerSuggestedPiece( t, peer, e->pieceIndex, false );
1709            break;
1710
1711        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1712            peerSuggestedPiece( t, peer, e->pieceIndex, true );
1713            break;
1714
1715        case TR_PEER_CLIENT_GOT_DATA:
1716        {
1717            const time_t now = tr_time( );
1718            tr_torrent * tor = t->tor;
1719
1720            if( e->wasPieceData )
1721            {
1722                tor->downloadedCur += e->length;
1723                tr_torrentSetActivityDate( tor, now );
1724                tr_torrentSetDirty( tor );
1725            }
1726
1727            /* update the stats */
1728            if( e->wasPieceData )
1729                tr_statsAddDownloaded( tor->session, e->length );
1730
1731            /* update our atom */
1732            if( peer->atom && e->wasPieceData )
1733                peer->atom->piece_data_time = now;
1734
1735            break;
1736        }
1737
1738        case TR_PEER_CLIENT_GOT_BLOCK:
1739        {
1740            tr_torrent * tor = t->tor;
1741            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1742            int i, peerCount;
1743            tr_peer ** peers;
1744            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1745
1746            removeRequestFromTables( t, block, peer );
1747            getBlockRequestPeers( t, block, &peerArr );
1748            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1749
1750            /* remove additional block requests and send cancel to peers */
1751            for( i=0; i<peerCount; i++ ) {
1752                tr_peer * p = peers[i];
1753                assert( p != peer );
1754                if( p->msgs ) {
1755                    tr_historyAdd( &p->cancelsSentToPeer, tr_time( ), 1 );
1756                    tr_peerMsgsCancel( p->msgs, block );
1757                }
1758                removeRequestFromTables( t, block, p );
1759            }
1760
1761            tr_ptrArrayDestruct( &peerArr, false );
1762
1763            tr_historyAdd( &peer->blocksSentToClient, tr_time( ), 1 );
1764
1765            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1766            {
1767                /* we already have this block... */
1768                const uint32_t n = tr_torBlockCountBytes( tor, block );
1769                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1770                tordbg( t, "we have this block already..." );
1771            }
1772            else
1773            {
1774                tr_cpBlockAdd( &tor->completion, block );
1775                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1776                tr_torrentSetDirty( tor );
1777
1778                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1779                {
1780                    const tr_piece_index_t p = e->pieceIndex;
1781                    const bool ok = tr_torrentCheckPiece( tor, p );
1782
1783                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1784
1785                    if( !ok )
1786                    {
1787                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1788                                   (unsigned long)p );
1789                    }
1790
1791                    tr_peerMgrSetBlame( tor, p, ok );
1792
1793                    if( !ok )
1794                    {
1795                        gotBadPiece( t, p );
1796                    }
1797                    else
1798                    {
1799                        int i;
1800                        int peerCount;
1801                        tr_peer ** peers;
1802                        tr_file_index_t fileIndex;
1803
1804                        /* only add this to downloadedCur if we got it from a peer --
1805                         * webseeds shouldn't count against our ratio. As one tracker
1806                         * admin put it, "Those pieces are downloaded directly from the
1807                         * content distributor, not the peers, it is the tracker's job
1808                         * to manage the swarms, not the web server and does not fit
1809                         * into the jurisdiction of the tracker." */
1810                        if( peer->msgs != NULL ) {
1811                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1812                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1813                        }
1814
1815                        peerCount = tr_ptrArraySize( &t->peers );
1816                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1817                        for( i=0; i<peerCount; ++i )
1818                            tr_peerMsgsHave( peers[i]->msgs, p );
1819
1820                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1821                            const tr_file * file = &tor->info.files[fileIndex];
1822                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1823                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1824                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1825                                    tr_torrentFileCompleted( tor, fileIndex );
1826                                }
1827                            }
1828                        }
1829
1830                        pieceListRemovePiece( t, p );
1831                    }
1832                }
1833
1834                t->needsCompletenessCheck = true;
1835            }
1836            break;
1837        }
1838
1839        case TR_PEER_ERROR:
1840            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1841            {
1842                /* some protocol error from the peer */
1843                peer->doPurge = 1;
1844                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1845                        tr_atomAddrStr( peer->atom ) );
1846            }
1847            else
1848            {
1849                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1850            }
1851            break;
1852
1853        default:
1854            assert( 0 );
1855    }
1856
1857    torrentUnlock( t );
1858}
1859
1860static int
1861getDefaultShelfLife( uint8_t from )
1862{
1863    /* in general, peers obtained from firsthand contact
1864     * are better than those from secondhand, etc etc */
1865    switch( from )
1866    {
1867        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1868        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1869        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1870        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1871        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1872        case TR_PEER_FROM_RESUME   : return 60 * 60;
1873        case TR_PEER_FROM_LPD      : return 10 * 60;
1874        default                    : return 60 * 60;
1875    }
1876}
1877
1878static void
1879ensureAtomExists( Torrent           * t,
1880                  const tr_address  * addr,
1881                  const tr_port       port,
1882                  const uint8_t       flags,
1883                  const int8_t        seedProbability,
1884                  const uint8_t       from )
1885{
1886    struct peer_atom * a;
1887
1888    assert( tr_address_is_valid( addr ) );
1889    assert( from < TR_PEER_FROM__MAX );
1890
1891    a = getExistingAtom( t, addr );
1892
1893    if( a == NULL )
1894    {
1895        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1896        a = tr_new0( struct peer_atom, 1 );
1897        a->addr = *addr;
1898        a->port = port;
1899        a->flags = flags;
1900        a->fromFirst = from;
1901        a->fromBest = from;
1902        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1903        a->blocklisted = -1;
1904        atomSetSeedProbability( a, seedProbability );
1905        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1906
1907        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1908    }
1909    else
1910    {
1911        if( from < a->fromBest )
1912            a->fromBest = from;
1913
1914        if( a->seedProbability == -1 )
1915            atomSetSeedProbability( a, seedProbability );
1916
1917        a->flags |= flags;
1918    }
1919}
1920
1921static int
1922getMaxPeerCount( const tr_torrent * tor )
1923{
1924    return tor->maxConnectedPeers;
1925}
1926
1927static int
1928getPeerCount( const Torrent * t )
1929{
1930    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1931}
1932
1933/* FIXME: this is kind of a mess. */
1934static bool
1935myHandshakeDoneCB( tr_handshake  * handshake,
1936                   tr_peerIo     * io,
1937                   bool            readAnythingFromPeer,
1938                   bool            isConnected,
1939                   const uint8_t * peer_id,
1940                   void          * vmanager )
1941{
1942    bool               ok = isConnected;
1943    bool               success = false;
1944    tr_port            port;
1945    const tr_address * addr;
1946    tr_peerMgr       * manager = vmanager;
1947    Torrent          * t;
1948    tr_handshake     * ours;
1949
1950    assert( io );
1951    assert( tr_isBool( ok ) );
1952
1953    t = tr_peerIoHasTorrentHash( io )
1954        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1955        : NULL;
1956
1957    if( tr_peerIoIsIncoming ( io ) )
1958        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1959                                        handshake, handshakeCompare );
1960    else if( t )
1961        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1962                                        handshake, handshakeCompare );
1963    else
1964        ours = handshake;
1965
1966    assert( ours );
1967    assert( ours == handshake );
1968
1969    if( t )
1970        torrentLock( t );
1971
1972    addr = tr_peerIoGetAddress( io, &port );
1973
1974    if( !ok || !t || !t->isRunning )
1975    {
1976        if( t )
1977        {
1978            struct peer_atom * atom = getExistingAtom( t, addr );
1979            if( atom )
1980            {
1981                ++atom->numFails;
1982
1983                if( !readAnythingFromPeer )
1984                {
1985                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1986                    atom->flags2 |= MYFLAG_UNREACHABLE;
1987                }
1988            }
1989        }
1990    }
1991    else /* looking good */
1992    {
1993        struct peer_atom * atom;
1994
1995        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1996        atom = getExistingAtom( t, addr );
1997        atom->time = tr_time( );
1998        atom->piece_data_time = 0;
1999        atom->lastConnectionAt = tr_time( );
2000
2001        if( !tr_peerIoIsIncoming( io ) )
2002        {
2003            atom->flags |= ADDED_F_CONNECTABLE;
2004            atom->flags2 &= ~MYFLAG_UNREACHABLE;
2005        }
2006
2007        /* In principle, this flag specifies whether the peer groks uTP,
2008           not whether it's currently connected over uTP. */
2009        if( io->utp_socket )
2010            atom->flags |= ADDED_F_UTP_FLAGS;
2011
2012        if( atom->flags2 & MYFLAG_BANNED )
2013        {
2014            tordbg( t, "banned peer %s tried to reconnect",
2015                    tr_atomAddrStr( atom ) );
2016        }
2017        else if( tr_peerIoIsIncoming( io )
2018               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
2019
2020        {
2021        }
2022        else
2023        {
2024            tr_peer * peer = atom->peer;
2025
2026            if( peer ) /* we already have this peer */
2027            {
2028            }
2029            else
2030            {
2031                peer = getPeer( t, atom );
2032                tr_free( peer->client );
2033
2034                if( !peer_id )
2035                    peer->client = NULL;
2036                else {
2037                    char client[128];
2038                    tr_clientForId( client, sizeof( client ), peer_id );
2039                    peer->client = tr_strdup( client );
2040                }
2041
2042                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2043                                                                balanced by our unref in peerDelete()  */
2044                tr_peerIoSetParent( peer->io, &t->tor->bandwidth );
2045                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2046
2047                success = true;
2048            }
2049        }
2050    }
2051
2052    if( t )
2053        torrentUnlock( t );
2054
2055    return success;
2056}
2057
2058void
2059tr_peerMgrAddIncoming( tr_peerMgr * manager,
2060                       tr_address * addr,
2061                       tr_port      port,
2062                       int          socket,
2063                       struct UTPSocket * utp_socket )
2064{
2065    tr_session * session;
2066
2067    managerLock( manager );
2068
2069    assert( tr_isSession( manager->session ) );
2070    session = manager->session;
2071
2072    if( tr_sessionIsAddressBlocked( session, addr ) )
2073    {
2074        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_address_to_string( addr ) );
2075        if(socket >= 0)
2076            tr_netClose( session, socket );
2077        else
2078            UTP_Close( utp_socket );
2079    }
2080    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2081    {
2082        if(socket >= 0)
2083            tr_netClose( session, socket );
2084        else
2085            UTP_Close( utp_socket );
2086    }
2087    else /* we don't have a connection to them yet... */
2088    {
2089        tr_peerIo *    io;
2090        tr_handshake * handshake;
2091
2092        io = tr_peerIoNewIncoming( session, &session->bandwidth, addr, port, socket, utp_socket );
2093
2094        handshake = tr_handshakeNew( io,
2095                                     session->encryptionMode,
2096                                     myHandshakeDoneCB,
2097                                     manager );
2098
2099        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2100
2101        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2102                                 handshakeCompare );
2103    }
2104
2105    managerUnlock( manager );
2106}
2107
2108void
2109tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2110                  const tr_pex * pex, int8_t seedProbability )
2111{
2112    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2113    {
2114        Torrent * t = tor->torrentPeers;
2115        managerLock( t->manager );
2116
2117        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2118            if( tr_address_is_valid_for_peers( &pex->addr, pex->port ) )
2119                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2120
2121        managerUnlock( t->manager );
2122    }
2123}
2124
2125void
2126tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2127{
2128    Torrent * t = tor->torrentPeers;
2129    const int n = tr_ptrArraySize( &t->pool );
2130    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2131    struct peer_atom ** end = it + n;
2132
2133    while( it != end )
2134        atomSetSeed( t, *it++ );
2135}
2136
2137tr_pex *
2138tr_peerMgrCompactToPex( const void *    compact,
2139                        size_t          compactLen,
2140                        const uint8_t * added_f,
2141                        size_t          added_f_len,
2142                        size_t *        pexCount )
2143{
2144    size_t          i;
2145    size_t          n = compactLen / 6;
2146    const uint8_t * walk = compact;
2147    tr_pex *        pex = tr_new0( tr_pex, n );
2148
2149    for( i = 0; i < n; ++i )
2150    {
2151        pex[i].addr.type = TR_AF_INET;
2152        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2153        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2154        if( added_f && ( n == added_f_len ) )
2155            pex[i].flags = added_f[i];
2156    }
2157
2158    *pexCount = n;
2159    return pex;
2160}
2161
2162tr_pex *
2163tr_peerMgrCompact6ToPex( const void    * compact,
2164                         size_t          compactLen,
2165                         const uint8_t * added_f,
2166                         size_t          added_f_len,
2167                         size_t        * pexCount )
2168{
2169    size_t          i;
2170    size_t          n = compactLen / 18;
2171    const uint8_t * walk = compact;
2172    tr_pex *        pex = tr_new0( tr_pex, n );
2173
2174    for( i = 0; i < n; ++i )
2175    {
2176        pex[i].addr.type = TR_AF_INET6;
2177        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2178        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2179        if( added_f && ( n == added_f_len ) )
2180            pex[i].flags = added_f[i];
2181    }
2182
2183    *pexCount = n;
2184    return pex;
2185}
2186
2187tr_pex *
2188tr_peerMgrArrayToPex( const void * array,
2189                      size_t       arrayLen,
2190                      size_t      * pexCount )
2191{
2192    size_t          i;
2193    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2194    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2195    const uint8_t * walk = array;
2196    tr_pex        * pex = tr_new0( tr_pex, n );
2197
2198    for( i = 0 ; i < n ; i++ ) {
2199        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2200        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2201        pex[i].flags = 0x00;
2202        walk += sizeof( tr_address ) + 2;
2203    }
2204
2205    *pexCount = n;
2206    return pex;
2207}
2208
2209/**
2210***
2211**/
2212
2213static void
2214tr_peerMgrSetBlame( tr_torrent     * tor,
2215                    tr_piece_index_t pieceIndex,
2216                    int              success )
2217{
2218    if( !success )
2219    {
2220        int        peerCount, i;
2221        Torrent *  t = tor->torrentPeers;
2222        tr_peer ** peers;
2223
2224        assert( torrentIsLocked( t ) );
2225
2226        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2227        for( i = 0; i < peerCount; ++i )
2228        {
2229            tr_peer * peer = peers[i];
2230            if( tr_bitfieldHas( &peer->blame, pieceIndex ) )
2231            {
2232                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2233                        tr_atomAddrStr( peer->atom ),
2234                        pieceIndex, (int)peer->strikes + 1 );
2235                addStrike( t, peer );
2236            }
2237        }
2238    }
2239}
2240
2241int
2242tr_pexCompare( const void * va, const void * vb )
2243{
2244    const tr_pex * a = va;
2245    const tr_pex * b = vb;
2246    int i;
2247
2248    assert( tr_isPex( a ) );
2249    assert( tr_isPex( b ) );
2250
2251    if(( i = tr_address_compare( &a->addr, &b->addr )))
2252        return i;
2253
2254    if( a->port != b->port )
2255        return a->port < b->port ? -1 : 1;
2256
2257    return 0;
2258}
2259
2260#if 0
2261static int
2262peerPrefersCrypto( const tr_peer * peer )
2263{
2264    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2265        return true;
2266
2267    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2268        return false;
2269
2270    return tr_peerIoIsEncrypted( peer->io );
2271}
2272#endif
2273
2274/* better goes first */
2275static int
2276compareAtomsByUsefulness( const void * va, const void *vb )
2277{
2278    const struct peer_atom * a = * (const struct peer_atom**) va;
2279    const struct peer_atom * b = * (const struct peer_atom**) vb;
2280
2281    assert( tr_isAtom( a ) );
2282    assert( tr_isAtom( b ) );
2283
2284    if( a->piece_data_time != b->piece_data_time )
2285        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2286    if( a->fromBest != b->fromBest )
2287        return a->fromBest < b->fromBest ? -1 : 1;
2288    if( a->numFails != b->numFails )
2289        return a->numFails < b->numFails ? -1 : 1;
2290
2291    return 0;
2292}
2293
2294static bool
2295isAtomInteresting( const tr_torrent * tor, struct peer_atom * atom )
2296{
2297    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
2298        return false;
2299
2300    if( peerIsInUse( tor->torrentPeers, atom ) )
2301        return true;
2302
2303    if( isAtomBlocklisted( tor->session, atom ) )
2304        return false;
2305
2306    if( atom->flags2 & MYFLAG_BANNED )
2307        return false;
2308
2309    return true;
2310}
2311
2312int
2313tr_peerMgrGetPeers( tr_torrent   * tor,
2314                    tr_pex      ** setme_pex,
2315                    uint8_t        af,
2316                    uint8_t        list_mode,
2317                    int            maxCount )
2318{
2319    int i;
2320    int n;
2321    int count = 0;
2322    int atomCount = 0;
2323    const Torrent * t = tor->torrentPeers;
2324    struct peer_atom ** atoms = NULL;
2325    tr_pex * pex;
2326    tr_pex * walk;
2327
2328    assert( tr_isTorrent( tor ) );
2329    assert( setme_pex != NULL );
2330    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2331    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING );
2332
2333    managerLock( t->manager );
2334
2335    /**
2336    ***  build a list of atoms
2337    **/
2338
2339    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2340    {
2341        int i;
2342        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2343        atomCount = tr_ptrArraySize( &t->peers );
2344        atoms = tr_new( struct peer_atom *, atomCount );
2345        for( i=0; i<atomCount; ++i )
2346            atoms[i] = peers[i]->atom;
2347    }
2348    else /* TR_PEERS_INTERESTING */
2349    {
2350        int i;
2351        struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2352        n = tr_ptrArraySize( &t->pool );
2353        atoms = tr_new( struct peer_atom *, n );
2354        for( i=0; i<n; ++i )
2355            if( isAtomInteresting( tor, atomBase[i] ) )
2356                atoms[atomCount++] = atomBase[i];
2357    }
2358
2359    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2360
2361    /**
2362    ***  add the first N of them into our return list
2363    **/
2364
2365    n = MIN( atomCount, maxCount );
2366    pex = walk = tr_new0( tr_pex, n );
2367
2368    for( i=0; i<atomCount && count<n; ++i )
2369    {
2370        const struct peer_atom * atom = atoms[i];
2371        if( atom->addr.type == af )
2372        {
2373            assert( tr_address_is_valid( &atom->addr ) );
2374            walk->addr = atom->addr;
2375            walk->port = atom->port;
2376            walk->flags = atom->flags;
2377            ++count;
2378            ++walk;
2379        }
2380    }
2381
2382    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2383
2384    assert( ( walk - pex ) == count );
2385    *setme_pex = pex;
2386
2387    /* cleanup */
2388    tr_free( atoms );
2389    managerUnlock( t->manager );
2390    return count;
2391}
2392
2393static void atomPulse      ( int, short, void * );
2394static void bandwidthPulse ( int, short, void * );
2395static void rechokePulse   ( int, short, void * );
2396static void reconnectPulse ( int, short, void * );
2397
2398static struct event *
2399createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2400{
2401    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2402    tr_timerAddMsec( timer, msec );
2403    return timer;
2404}
2405
2406static void
2407ensureMgrTimersExist( struct tr_peerMgr * m )
2408{
2409    if( m->atomTimer == NULL )
2410        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2411
2412    if( m->bandwidthTimer == NULL )
2413        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2414
2415    if( m->rechokeTimer == NULL )
2416        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2417
2418    if( m->refillUpkeepTimer == NULL )
2419        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2420}
2421
2422void
2423tr_peerMgrStartTorrent( tr_torrent * tor )
2424{
2425    Torrent * t = tor->torrentPeers;
2426
2427    assert( tr_isTorrent( tor ) );
2428    assert( tr_torrentIsLocked( tor ) );
2429
2430    ensureMgrTimersExist( t->manager );
2431
2432    t->isRunning = true;
2433    t->maxPeers = t->tor->maxConnectedPeers;
2434    t->pieceSortState = PIECES_UNSORTED;
2435
2436    rechokePulse( 0, 0, t->manager );
2437}
2438
2439static void
2440stopTorrent( Torrent * t )
2441{
2442    tr_peer * peer;
2443
2444    t->isRunning = false;
2445
2446    replicationFree( t );
2447    invalidatePieceSorting( t );
2448
2449    /* disconnect the peers. */
2450    while(( peer = tr_ptrArrayPop( &t->peers )))
2451        peerDelete( t, peer );
2452
2453    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2454     * which removes the handshake from t->outgoingHandshakes... */
2455    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2456        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2457}
2458
2459void
2460tr_peerMgrStopTorrent( tr_torrent * tor )
2461{
2462    assert( tr_isTorrent( tor ) );
2463    assert( tr_torrentIsLocked( tor ) );
2464
2465    stopTorrent( tor->torrentPeers );
2466}
2467
2468void
2469tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2470{
2471    assert( tr_isTorrent( tor ) );
2472    assert( tr_torrentIsLocked( tor ) );
2473    assert( tor->torrentPeers == NULL );
2474
2475    tor->torrentPeers = torrentNew( manager, tor );
2476}
2477
2478void
2479tr_peerMgrRemoveTorrent( tr_torrent * tor )
2480{
2481    assert( tr_isTorrent( tor ) );
2482    assert( tr_torrentIsLocked( tor ) );
2483
2484    stopTorrent( tor->torrentPeers );
2485    torrentFree( tor->torrentPeers );
2486}
2487
2488void
2489tr_peerUpdateProgress( tr_torrent * tor, tr_peer * peer )
2490{
2491    const tr_bitfield * have = &peer->have;
2492
2493    if( tr_bitfieldHasAll( have ) )
2494    {
2495        peer->progress = 1.0;
2496    }
2497    else if( tr_bitfieldHasNone( have ) )
2498    {
2499        peer->progress = 0.0;
2500    }
2501    else
2502    {
2503        const float true_count = tr_bitfieldCountTrueBits( have );
2504
2505        if( tr_torrentHasMetadata( tor ) )
2506            peer->progress = true_count / tor->info.pieceCount;
2507        else /* without pieceCount, this result is only a best guess... */
2508            peer->progress = true_count / ( have->bit_count + 1 );
2509    }
2510
2511    if( peer->atom && ( peer->progress >= 1.0 ) )
2512        atomSetSeed( tor->torrentPeers, peer->atom );
2513}
2514
2515void
2516tr_peerMgrOnTorrentGotMetainfo( tr_torrent * tor )
2517{
2518    int i;
2519    const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2520    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2521
2522    /* some peer_msgs' progress fields may not be accurate if we
2523       didn't have the metadata before now... so refresh them all... */
2524    for( i=0; i<peerCount; ++i )
2525        tr_peerUpdateProgress( tor, peers[i] );
2526}
2527
2528void
2529tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2530{
2531    assert( tr_isTorrent( tor ) );
2532    assert( torrentIsLocked( tor->torrentPeers ) );
2533    assert( tab != NULL );
2534    assert( tabCount > 0 );
2535
2536    memset( tab, 0, tabCount );
2537
2538    if( tr_torrentHasMetadata( tor ) )
2539    {
2540        tr_piece_index_t i;
2541        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2542        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2543        const float interval = tor->info.pieceCount / (float)tabCount;
2544        const bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2545
2546        for( i=0; i<tabCount; ++i )
2547        {
2548            const int piece = i * interval;
2549
2550            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2551                tab[i] = -1;
2552            else if( peerCount ) {
2553                int j;
2554                for( j=0; j<peerCount; ++j )
2555                    if( tr_bitfieldHas( &peers[j]->have, piece ) )
2556                        ++tab[i];
2557            }
2558        }
2559    }
2560}
2561
2562static bool
2563peerIsSeed( const tr_peer * peer )
2564{
2565    if( peer->progress >= 1.0 )
2566        return true;
2567
2568    if( peer->atom && atomIsSeed( peer->atom ) )
2569        return true;
2570
2571    return false;
2572}
2573
2574/* count how many bytes we want that connected peers have */
2575uint64_t
2576tr_peerMgrGetDesiredAvailable( const tr_torrent * tor )
2577{
2578    size_t i;
2579    size_t n;
2580    uint64_t desiredAvailable;
2581    const Torrent * t = tor->torrentPeers;
2582
2583    /* common shortcuts... */
2584
2585    if( tr_torrentIsSeed( t->tor ) )
2586        return 0;
2587
2588    if( !tr_torrentHasMetadata( tor ) )
2589        return 0;
2590
2591    n = tr_ptrArraySize( &t->peers );
2592    if( n == 0 )
2593        return 0;
2594    else {
2595        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2596        for( i=0; i<n; ++i )
2597            if( peers[i]->atom && atomIsSeed( peers[i]->atom ) )
2598                return tr_cpLeftUntilDone( &tor->completion );
2599    }
2600
2601    if( !t->pieceReplication || !t->pieceReplicationSize )
2602        return 0;
2603
2604    /* do it the hard way */
2605
2606    desiredAvailable = 0;
2607    for( i=0, n=MIN(tor->info.pieceCount, t->pieceReplicationSize); i<n; ++i )
2608        if( !tor->info.pieces[i].dnd && ( t->pieceReplication[i] > 0 ) )
2609            desiredAvailable += tr_cpMissingBytesInPiece( &t->tor->completion, i );
2610
2611    assert( desiredAvailable <= tor->info.totalSize );
2612    return desiredAvailable;
2613}
2614
2615void
2616tr_peerMgrTorrentStats( tr_torrent  * tor,
2617                        int         * setmePeersConnected,
2618                        int         * setmeWebseedsSendingToUs,
2619                        int         * setmePeersSendingToUs,
2620                        int         * setmePeersGettingFromUs,
2621                        int         * setmePeersFrom )
2622{
2623    int i, size;
2624    const Torrent * t = tor->torrentPeers;
2625    const tr_peer ** peers;
2626
2627    assert( tr_torrentIsLocked( tor ) );
2628
2629    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2630    size = tr_ptrArraySize( &t->peers );
2631
2632    *setmePeersConnected       = 0;
2633    *setmePeersGettingFromUs   = 0;
2634    *setmePeersSendingToUs     = 0;
2635    *setmeWebseedsSendingToUs  = 0;
2636
2637    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2638        setmePeersFrom[i] = 0;
2639
2640    for( i=0; i<size; ++i )
2641    {
2642        const tr_peer * peer = peers[i];
2643        const struct peer_atom * atom = peer->atom;
2644
2645        if( peer->io == NULL ) /* not connected */
2646            continue;
2647
2648        ++*setmePeersConnected;
2649
2650        ++setmePeersFrom[atom->fromFirst];
2651
2652        if( clientIsDownloadingFrom( tor, peer ) )
2653            ++*setmePeersSendingToUs;
2654
2655        if( clientIsUploadingTo( peer ) )
2656            ++*setmePeersGettingFromUs;
2657    }
2658
2659    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2660}
2661
2662double*
2663tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2664{
2665    int i;
2666    const Torrent * t = tor->torrentPeers;
2667    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2668    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2669    const uint64_t now = tr_time_msec( );
2670    double * ret = tr_new0( double, webseedCount );
2671
2672    assert( tr_isTorrent( tor ) );
2673    assert( tr_torrentIsLocked( tor ) );
2674    assert( t->manager != NULL );
2675    assert( webseedCount == tor->info.webseedCount );
2676
2677    for( i=0; i<webseedCount; ++i ) {
2678        int Bps;
2679        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2680            ret[i] = Bps / (double)tr_speed_K;
2681        else
2682            ret[i] = -1.0;
2683    }
2684
2685    return ret;
2686}
2687
2688int
2689tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2690{
2691    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2692}
2693
2694struct tr_peer_stat *
2695tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2696{
2697    int i;
2698    const Torrent * t = tor->torrentPeers;
2699    const int size = tr_ptrArraySize( &t->peers );
2700    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2701    const uint64_t now_msec = tr_time_msec( );
2702    const time_t now = tr_time();
2703    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2704
2705    assert( tr_isTorrent( tor ) );
2706    assert( tr_torrentIsLocked( tor ) );
2707    assert( t->manager );
2708
2709    for( i=0; i<size; ++i )
2710    {
2711        char *                   pch;
2712        const tr_peer *          peer = peers[i];
2713        const struct peer_atom * atom = peer->atom;
2714        tr_peer_stat *           stat = ret + i;
2715
2716        tr_address_to_string_with_buf( &atom->addr, stat->addr, sizeof( stat->addr ) );
2717        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2718                   sizeof( stat->client ) );
2719        stat->port                = ntohs( peer->atom->port );
2720        stat->from                = atom->fromFirst;
2721        stat->progress            = peer->progress;
2722        stat->isUTP               = peer->io->utp_socket != NULL;
2723        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2724        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2725        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2726        stat->peerIsChoked        = peer->peerIsChoked;
2727        stat->peerIsInterested    = peer->peerIsInterested;
2728        stat->clientIsChoked      = peer->clientIsChoked;
2729        stat->clientIsInterested  = peer->clientIsInterested;
2730        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2731        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2732        stat->isUploadingTo       = clientIsUploadingTo( peer );
2733        stat->isSeed              = peerIsSeed( peer );
2734
2735        stat->blocksToPeer        = tr_historyGet( &peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2736        stat->blocksToClient      = tr_historyGet( &peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2737        stat->cancelsToPeer       = tr_historyGet( &peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2738        stat->cancelsToClient     = tr_historyGet( &peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2739
2740        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2741        stat->pendingReqsToClient = peer->pendingReqsToClient;
2742
2743        pch = stat->flagStr;
2744        if( stat->isUTP ) *pch++ = 'T';
2745        if( t->optimistic == peer ) *pch++ = 'O';
2746        if( stat->isDownloadingFrom ) *pch++ = 'D';
2747        else if( stat->clientIsInterested ) *pch++ = 'd';
2748        if( stat->isUploadingTo ) *pch++ = 'U';
2749        else if( stat->peerIsInterested ) *pch++ = 'u';
2750        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2751        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2752        if( stat->isEncrypted ) *pch++ = 'E';
2753        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2754        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2755        if( stat->isIncoming ) *pch++ = 'I';
2756        *pch = '\0';
2757    }
2758
2759    *setmeCount = size;
2760
2761    return ret;
2762}
2763
2764/***
2765****
2766****
2767***/
2768
2769void
2770tr_peerMgrClearInterest( tr_torrent * tor )
2771{
2772    int i;
2773    Torrent * t = tor->torrentPeers;
2774    const int peerCount = tr_ptrArraySize( &t->peers );
2775
2776    assert( tr_isTorrent( tor ) );
2777    assert( tr_torrentIsLocked( tor ) );
2778
2779    for( i=0; i<peerCount; ++i )
2780    {
2781        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2782        tr_peerMsgsSetInterested( peer->msgs, false );
2783    }
2784}
2785
2786/* does this peer have any pieces that we want? */
2787static bool
2788isPeerInteresting( const tr_torrent  * const tor,
2789                   const tr_bitfield * const interesting_pieces,
2790                   const tr_peer     * const peer )
2791{
2792    tr_piece_index_t i, n;
2793
2794    /* these cases should have already been handled by the calling code... */
2795    assert( !tr_torrentIsSeed( tor ) );
2796    assert( tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) );
2797
2798    if( peerIsSeed( peer ) )
2799        return true;
2800
2801    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2802        if( tr_bitfieldHas( interesting_pieces, i ) && tr_bitfieldHas( &peer->have, i ) )
2803            return true;
2804
2805    return false;
2806}
2807
2808typedef enum
2809{
2810    RECHOKE_STATE_GOOD,
2811    RECHOKE_STATE_UNTESTED,
2812    RECHOKE_STATE_BAD
2813}
2814tr_rechoke_state;
2815
2816struct tr_rechoke_info
2817{
2818    tr_peer * peer;
2819    int salt;
2820    int rechoke_state;
2821};
2822
2823static int
2824compare_rechoke_info( const void * va, const void * vb )
2825{
2826    const struct tr_rechoke_info * a = va;
2827    const struct tr_rechoke_info * b = vb;
2828
2829    if( a->rechoke_state != b->rechoke_state )
2830        return a->rechoke_state - b->rechoke_state;
2831
2832    return a->salt - b->salt;
2833}
2834
2835/* determines who we send "interested" messages to */
2836static void
2837rechokeDownloads( Torrent * t )
2838{
2839    int i;
2840    int maxPeers = 0;
2841    int rechoke_count = 0;
2842    struct tr_rechoke_info * rechoke = NULL;
2843    const int MIN_INTERESTING_PEERS = 5;
2844    const int peerCount = tr_ptrArraySize( &t->peers );
2845    const time_t now = tr_time( );
2846
2847    /* some cases where this function isn't necessary */
2848    if( tr_torrentIsSeed( t->tor ) )
2849        return;
2850    if ( !tr_torrentIsPieceTransferAllowed( t->tor, TR_PEER_TO_CLIENT ) )
2851        return;
2852
2853    /* decide HOW MANY peers to be interested in */
2854    {
2855        int blocks = 0;
2856        int cancels = 0;
2857        time_t timeSinceCancel;
2858
2859        /* Count up how many blocks & cancels each peer has.
2860         *
2861         * There are two situations where we send out cancels --
2862         *
2863         * 1. We've got unresponsive peers, which is handled by deciding
2864         *    -which- peers to be interested in.
2865         *
2866         * 2. We've hit our bandwidth cap, which is handled by deciding
2867         *    -how many- peers to be interested in.
2868         *
2869         * We're working on 2. here, so we need to ignore unresponsive
2870         * peers in our calculations lest they confuse Transmission into
2871         * thinking it's hit its bandwidth cap.
2872         */
2873        for( i=0; i<peerCount; ++i )
2874        {
2875            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2876            const int b = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2877            const int c = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2878
2879            if( b == 0 ) /* ignore unresponsive peers, as described above */
2880                continue;
2881
2882            blocks += b;
2883            cancels += c;
2884        }
2885
2886        if( cancels > 0 )
2887        {
2888            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2889             * higher values indicate more congestion. */
2890            const double cancelRate = cancels / (double)(cancels + blocks);
2891            const double mult = 1 - MIN( cancelRate, 0.5 );
2892            maxPeers = t->interestedCount * mult;
2893            tordbg( t, "cancel rate is %.3f -- reducing the "
2894                       "number of peers we're interested in by %.0f percent",
2895                       cancelRate, mult * 100 );
2896            t->lastCancel = now;
2897        }
2898
2899        timeSinceCancel = now - t->lastCancel;
2900        if( timeSinceCancel )
2901        {
2902            const int maxIncrease = 15;
2903            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2904            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2905            const int inc = maxIncrease * mult;
2906            maxPeers = t->maxPeers + inc;
2907            tordbg( t, "time since last cancel is %li -- increasing the "
2908                       "number of peers we're interested in by %d",
2909                       timeSinceCancel, inc );
2910        }
2911    }
2912
2913    /* don't let the previous section's number tweaking go too far... */
2914    if( maxPeers < MIN_INTERESTING_PEERS )
2915        maxPeers = MIN_INTERESTING_PEERS;
2916    if( maxPeers > t->tor->maxConnectedPeers )
2917        maxPeers = t->tor->maxConnectedPeers;
2918
2919    t->maxPeers = maxPeers;
2920
2921    if( peerCount > 0 )
2922    {
2923        const tr_torrent * const tor = t->tor;
2924        const int n = tor->info.pieceCount;
2925        tr_bitfield interesting_pieces = TR_BITFIELD_INIT;
2926
2927        /* build a bitfield of interesting pieces... */
2928        tr_bitfieldConstruct( &interesting_pieces, n );
2929        for( i=0; i<n; i++ )
2930            if( !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete( &tor->completion, i ) )
2931                tr_bitfieldAdd( &interesting_pieces, i );
2932
2933        /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2934        for( i=0; i<peerCount; ++i )
2935        {
2936            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2937
2938            if( !isPeerInteresting( t->tor, &interesting_pieces, peer ) )
2939            {
2940                tr_peerMsgsSetInterested( peer->msgs, false );
2941            }
2942            else
2943            {
2944                tr_rechoke_state rechoke_state;
2945                const int blocks = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2946                const int cancels = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2947
2948                if( !blocks && !cancels )
2949                    rechoke_state = RECHOKE_STATE_UNTESTED;
2950                else if( !cancels )
2951                    rechoke_state = RECHOKE_STATE_GOOD;
2952                else if( !blocks )
2953                    rechoke_state = RECHOKE_STATE_BAD;
2954                else if( ( cancels * 10 ) < blocks )
2955                    rechoke_state = RECHOKE_STATE_GOOD;
2956                else
2957                    rechoke_state = RECHOKE_STATE_BAD;
2958
2959                if( rechoke == NULL )
2960                    rechoke = tr_new( struct tr_rechoke_info, peerCount );
2961
2962                 rechoke[rechoke_count].peer = peer;
2963                 rechoke[rechoke_count].rechoke_state = rechoke_state;
2964                 rechoke[rechoke_count].salt = tr_cryptoWeakRandInt( INT_MAX );
2965                 rechoke_count++;
2966            }
2967
2968        }
2969
2970        tr_bitfieldDestruct( &interesting_pieces );
2971    }
2972
2973    /* now that we know which & how many peers to be interested in... update the peer interest */
2974    qsort( rechoke, rechoke_count, sizeof( struct tr_rechoke_info ), compare_rechoke_info );
2975    t->interestedCount = MIN( maxPeers, rechoke_count );
2976    for( i=0; i<rechoke_count; ++i )
2977        tr_peerMsgsSetInterested( rechoke[i].peer->msgs, i<t->interestedCount );
2978
2979    /* cleanup */
2980    tr_free( rechoke );
2981}
2982
2983/**
2984***
2985**/
2986
2987struct ChokeData
2988{
2989    bool            isInterested;
2990    bool            wasChoked;
2991    bool            isChoked;
2992    int             rate;
2993    int             salt;
2994    tr_peer *       peer;
2995};
2996
2997static int
2998compareChoke( const void * va, const void * vb )
2999{
3000    const struct ChokeData * a = va;
3001    const struct ChokeData * b = vb;
3002
3003    if( a->rate != b->rate ) /* prefer higher overall speeds */
3004        return a->rate > b->rate ? -1 : 1;
3005
3006    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
3007        return a->wasChoked ? 1 : -1;
3008
3009    if( a->salt != b->salt ) /* random order */
3010        return a->salt - b->salt;
3011
3012    return 0;
3013}
3014
3015/* is this a new connection? */
3016static int
3017isNew( const tr_peer * peer )
3018{
3019    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
3020}
3021
3022/* get a rate for deciding which peers to choke and unchoke. */
3023static int
3024getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
3025{
3026    int Bps;
3027
3028    if( tr_torrentIsSeed( tor ) )
3029        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3030
3031    /* downloading a private torrent... take upload speed into account
3032     * because there may only be a small window of opportunity to share */
3033    else if( tr_torrentIsPrivate( tor ) )
3034        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
3035            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3036
3037    /* downloading a public torrent */
3038    else
3039        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
3040
3041    /* convert it to bytes per second */
3042    return Bps;
3043}
3044
3045static inline bool
3046isBandwidthMaxedOut( const tr_bandwidth * b,
3047                     const uint64_t now_msec, tr_direction dir )
3048{
3049    if( !tr_bandwidthIsLimited( b, dir ) )
3050        return false;
3051    else {
3052        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
3053        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
3054        return got >= want;
3055    }
3056}
3057
3058static void
3059rechokeUploads( Torrent * t, const uint64_t now )
3060{
3061    int i, size, unchokedInterested;
3062    const int peerCount = tr_ptrArraySize( &t->peers );
3063    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
3064    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
3065    const tr_session * session = t->manager->session;
3066    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
3067    const bool isMaxedOut = isBandwidthMaxedOut( &t->tor->bandwidth, now, TR_UP );
3068
3069    assert( torrentIsLocked( t ) );
3070
3071    /* an optimistic unchoke peer's "optimistic"
3072     * state lasts for N calls to rechokeUploads(). */
3073    if( t->optimisticUnchokeTimeScaler > 0 )
3074        t->optimisticUnchokeTimeScaler--;
3075    else
3076        t->optimistic = NULL;
3077
3078    /* sort the peers by preference and rate */
3079    for( i = 0, size = 0; i < peerCount; ++i )
3080    {
3081        tr_peer * peer = peers[i];
3082        struct peer_atom * atom = peer->atom;
3083
3084        if( peerIsSeed( peer ) ) /* choke seeds and partial seeds */
3085        {
3086            tr_peerMsgsSetChoke( peer->msgs, true );
3087        }
3088        else if( chokeAll ) /* choke everyone if we're not uploading */
3089        {
3090            tr_peerMsgsSetChoke( peer->msgs, true );
3091        }
3092        else if( peer != t->optimistic )
3093        {
3094            struct ChokeData * n = &choke[size++];
3095            n->peer         = peer;
3096            n->isInterested = peer->peerIsInterested;
3097            n->wasChoked    = peer->peerIsChoked;
3098            n->rate         = getRate( t->tor, atom, now );
3099            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
3100            n->isChoked     = true;
3101        }
3102    }
3103
3104    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3105
3106    /**
3107     * Reciprocation and number of uploads capping is managed by unchoking
3108     * the N peers which have the best upload rate and are interested.
3109     * This maximizes the client's download rate. These N peers are
3110     * referred to as downloaders, because they are interested in downloading
3111     * from the client.
3112     *
3113     * Peers which have a better upload rate (as compared to the downloaders)
3114     * but aren't interested get unchoked. If they become interested, the
3115     * downloader with the worst upload rate gets choked. If a client has
3116     * a complete file, it uses its upload rate rather than its download
3117     * rate to decide which peers to unchoke.
3118     *
3119     * If our bandwidth is maxed out, don't unchoke any more peers.
3120     */
3121    unchokedInterested = 0;
3122    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3123        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3124        if( choke[i].isInterested )
3125            ++unchokedInterested;
3126    }
3127
3128    /* optimistic unchoke */
3129    if( !t->optimistic && !isMaxedOut && (i<size) )
3130    {
3131        int n;
3132        struct ChokeData * c;
3133        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3134
3135        for( ; i<size; ++i )
3136        {
3137            if( choke[i].isInterested )
3138            {
3139                const tr_peer * peer = choke[i].peer;
3140                int x = 1, y;
3141                if( isNew( peer ) ) x *= 3;
3142                for( y=0; y<x; ++y )
3143                    tr_ptrArrayAppend( &randPool, &choke[i] );
3144            }
3145        }
3146
3147        if(( n = tr_ptrArraySize( &randPool )))
3148        {
3149            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3150            c->isChoked = false;
3151            t->optimistic = c->peer;
3152            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3153        }
3154
3155        tr_ptrArrayDestruct( &randPool, NULL );
3156    }
3157
3158    for( i=0; i<size; ++i )
3159        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3160
3161    /* cleanup */
3162    tr_free( choke );
3163}
3164
3165static void
3166rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3167{
3168    tr_torrent * tor = NULL;
3169    tr_peerMgr * mgr = vmgr;
3170    const uint64_t now = tr_time_msec( );
3171
3172    managerLock( mgr );
3173
3174    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3175        if( tor->isRunning ) {
3176            Torrent * t = tor->torrentPeers;
3177            if( !tr_ptrArrayEmpty( &t->peers ) ) {
3178                rechokeUploads( t, now );
3179                rechokeDownloads( t );
3180            }
3181        }
3182    }
3183
3184    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3185    managerUnlock( mgr );
3186}
3187
3188/***
3189****
3190****  Life and Death
3191****
3192***/
3193
3194static bool
3195shouldPeerBeClosed( const Torrent    * t,
3196                    const tr_peer    * peer,
3197                    int                peerCount,
3198                    const time_t       now )
3199{
3200    const tr_torrent *       tor = t->tor;
3201    const struct peer_atom * atom = peer->atom;
3202
3203    /* if it's marked for purging, close it */
3204    if( peer->doPurge )
3205    {
3206        tordbg( t, "purging peer %s because its doPurge flag is set",
3207                tr_atomAddrStr( atom ) );
3208        return true;
3209    }
3210
3211    /* disconnect if we're both seeds and enough time has passed for PEX */
3212    if( tr_torrentIsSeed( tor ) && peerIsSeed( peer ) )
3213        return !tr_torrentAllowsPex(tor) || (now-atom->time>=30);
3214
3215    /* disconnect if it's been too long since piece data has been transferred.
3216     * this is on a sliding scale based on number of available peers... */
3217    {
3218        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3219        /* if we have >= relaxIfFewerThan, strictness is 100%.
3220         * if we have zero connections, strictness is 0% */
3221        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3222                               ? 1.0
3223                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3224        const int lo = MIN_UPLOAD_IDLE_SECS;
3225        const int hi = MAX_UPLOAD_IDLE_SECS;
3226        const int limit = hi - ( ( hi - lo ) * strictness );
3227        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3228/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3229        if( idleTime > limit ) {
3230            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3231                       tr_atomAddrStr( atom ), idleTime );
3232            return true;
3233        }
3234    }
3235
3236    return false;
3237}
3238
3239static tr_peer **
3240getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3241{
3242    int i, peerCount, outsize;
3243    struct tr_peer ** ret = NULL;
3244    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3245
3246    assert( torrentIsLocked( t ) );
3247
3248    for( i = outsize = 0; i < peerCount; ++i ) {
3249        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) ) {
3250            if( ret == NULL )
3251                ret = tr_new( tr_peer *, peerCount );
3252            ret[outsize++] = peers[i];
3253        }
3254    }
3255
3256    *setmeSize = outsize;
3257    return ret;
3258}
3259
3260static int
3261getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3262{
3263    int sec;
3264
3265    /* if we were recently connected to this peer and transferring piece
3266     * data, try to reconnect to them sooner rather that later -- we don't
3267     * want network troubles to get in the way of a good peer. */
3268    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3269        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3270
3271    /* don't allow reconnects more often than our minimum */
3272    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3273        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3274
3275    /* otherwise, the interval depends on how many times we've tried
3276     * and failed to connect to the peer */
3277    else switch( atom->numFails ) {
3278        case 0: sec = 0; break;
3279        case 1: sec = 5; break;
3280        case 2: sec = 2 * 60; break;
3281        case 3: sec = 15 * 60; break;
3282        case 4: sec = 30 * 60; break;
3283        case 5: sec = 60 * 60; break;
3284        default: sec = 120 * 60; break;
3285    }
3286
3287    /* penalize peers that were unreachable the last time we tried */
3288    if( atom->flags2 & MYFLAG_UNREACHABLE )
3289        sec += sec;
3290
3291    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3292    return sec;
3293}
3294
3295static void
3296removePeer( Torrent * t, tr_peer * peer )
3297{
3298    tr_peer * removed;
3299    struct peer_atom * atom = peer->atom;
3300
3301    assert( torrentIsLocked( t ) );
3302    assert( atom );
3303
3304    atom->time = tr_time( );
3305
3306    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3307
3308    if( replicationExists( t ) )
3309        tr_decrReplicationFromBitfield( t, &peer->have );
3310
3311    assert( removed == peer );
3312    peerDelete( t, removed );
3313}
3314
3315static void
3316closePeer( Torrent * t, tr_peer * peer )
3317{
3318    struct peer_atom * atom;
3319
3320    assert( t != NULL );
3321    assert( peer != NULL );
3322
3323    atom = peer->atom;
3324
3325    /* if we transferred piece data, then they might be good peers,
3326       so reset their `numFails' weight to zero. otherwise we connected
3327       to them fruitlessly, so mark it as another fail */
3328    if( atom->piece_data_time ) {
3329        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3330        atom->numFails = 0;
3331    } else {
3332        ++atom->numFails;
3333        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3334    }
3335
3336    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3337    removePeer( t, peer );
3338}
3339
3340static void
3341removeAllPeers( Torrent * t )
3342{
3343    while( !tr_ptrArrayEmpty( &t->peers ) )
3344        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3345}
3346
3347static void
3348closeBadPeers( Torrent * t, const time_t now_sec )
3349{
3350    if( !tr_ptrArrayEmpty( &t->peers ) )
3351    {
3352        int i;
3353        int peerCount;
3354        struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3355        for( i=0; i<peerCount; ++i )
3356            closePeer( t, peers[i] );
3357        tr_free( peers );
3358    }
3359}
3360
3361struct peer_liveliness
3362{
3363    tr_peer * peer;
3364    void * clientData;
3365    time_t pieceDataTime;
3366    time_t time;
3367    int speed;
3368    bool doPurge;
3369};
3370
3371static int
3372comparePeerLiveliness( const void * va, const void * vb )
3373{
3374    const struct peer_liveliness * a = va;
3375    const struct peer_liveliness * b = vb;
3376
3377    if( a->doPurge != b->doPurge )
3378        return a->doPurge ? 1 : -1;
3379
3380    if( a->speed != b->speed ) /* faster goes first */
3381        return a->speed > b->speed ? -1 : 1;
3382
3383    /* the one to give us data more recently goes first */
3384    if( a->pieceDataTime != b->pieceDataTime )
3385        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3386
3387    /* the one we connected to most recently goes first */
3388    if( a->time != b->time )
3389        return a->time > b->time ? -1 : 1;
3390
3391    return 0;
3392}
3393
3394static void
3395sortPeersByLivelinessImpl( tr_peer  ** peers,
3396                           void     ** clientData,
3397                           int         n,
3398                           uint64_t    now,
3399                           int (*compare) ( const void *va, const void *vb ) )
3400{
3401    int i;
3402    struct peer_liveliness *lives, *l;
3403
3404    /* build a sortable array of peer + extra info */
3405    lives = l = tr_new0( struct peer_liveliness, n );
3406    for( i=0; i<n; ++i, ++l )
3407    {
3408        tr_peer * p = peers[i];
3409        l->peer = p;
3410        l->doPurge = p->doPurge;
3411        l->pieceDataTime = p->atom->piece_data_time;
3412        l->time = p->atom->time;
3413        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3414                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3415        if( clientData )
3416            l->clientData = clientData[i];
3417    }
3418
3419    /* sort 'em */
3420    assert( n == ( l - lives ) );
3421    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3422
3423    /* build the peer array */
3424    for( i=0, l=lives; i<n; ++i, ++l ) {
3425        peers[i] = l->peer;
3426        if( clientData )
3427            clientData[i] = l->clientData;
3428    }
3429    assert( n == ( l - lives ) );
3430
3431    /* cleanup */
3432    tr_free( lives );
3433}
3434
3435static void
3436sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3437{
3438    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3439}
3440
3441
3442static void
3443enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3444{
3445    int n = tr_ptrArraySize( &t->peers );
3446    const int max = tr_torrentGetPeerLimit( t->tor );
3447    if( n > max )
3448    {
3449        void * base = tr_ptrArrayBase( &t->peers );
3450        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3451        sortPeersByLiveliness( peers, NULL, n, now );
3452        while( n > max )
3453            closePeer( t, peers[--n] );
3454        tr_free( peers );
3455    }
3456}
3457
3458static void
3459enforceSessionPeerLimit( tr_session * session, uint64_t now )
3460{
3461    int n = 0;
3462    tr_torrent * tor = NULL;
3463    const int max = tr_sessionGetPeerLimit( session );
3464
3465    /* count the total number of peers */
3466    while(( tor = tr_torrentNext( session, tor )))
3467        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3468
3469    /* if there are too many, prune out the worst */
3470    if( n > max )
3471    {
3472        tr_peer ** peers = tr_new( tr_peer*, n );
3473        Torrent ** torrents = tr_new( Torrent*, n );
3474
3475        /* populate the peer array */
3476        n = 0;
3477        tor = NULL;
3478        while(( tor = tr_torrentNext( session, tor ))) {
3479            int i;
3480            Torrent * t = tor->torrentPeers;
3481            const int tn = tr_ptrArraySize( &t->peers );
3482            for( i=0; i<tn; ++i, ++n ) {
3483                peers[n] = tr_ptrArrayNth( &t->peers, i );
3484                torrents[n] = t;
3485            }
3486        }
3487
3488        /* sort 'em */
3489        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3490
3491        /* cull out the crappiest */
3492        while( n-- > max )
3493            closePeer( torrents[n], peers[n] );
3494
3495        /* cleanup */
3496        tr_free( torrents );
3497        tr_free( peers );
3498    }
3499}
3500
3501static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3502
3503static void
3504reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3505{
3506    tr_torrent * tor;
3507    tr_peerMgr * mgr = vmgr;
3508    const time_t now_sec = tr_time( );
3509    const uint64_t now_msec = tr_time_msec( );
3510
3511    /**
3512    ***  enforce the per-session and per-torrent peer limits
3513    **/
3514
3515    /* if we're over the per-torrent peer limits, cull some peers */
3516    tor = NULL;
3517    while(( tor = tr_torrentNext( mgr->session, tor )))
3518        if( tor->isRunning )
3519            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3520
3521    /* if we're over the per-session peer limits, cull some peers */
3522    enforceSessionPeerLimit( mgr->session, now_msec );
3523
3524    /* remove crappy peers */
3525    tor = NULL;
3526    while(( tor = tr_torrentNext( mgr->session, tor )))
3527        if( !tor->torrentPeers->isRunning )
3528            removeAllPeers( tor->torrentPeers );
3529        else
3530            closeBadPeers( tor->torrentPeers, now_sec );
3531
3532    /* try to make new peer connections */
3533    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3534}
3535
3536/****
3537*****
3538*****  BANDWIDTH ALLOCATION
3539*****
3540****/
3541
3542static void
3543pumpAllPeers( tr_peerMgr * mgr )
3544{
3545    tr_torrent * tor = NULL;
3546
3547    while(( tor = tr_torrentNext( mgr->session, tor )))
3548    {
3549        int j;
3550        Torrent * t = tor->torrentPeers;
3551
3552        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3553        {
3554            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3555            tr_peerMsgsPulse( peer->msgs );
3556        }
3557    }
3558}
3559
3560static void
3561bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3562{
3563    tr_torrent * tor;
3564    tr_peerMgr * mgr = vmgr;
3565    managerLock( mgr );
3566
3567    /* FIXME: this next line probably isn't necessary... */
3568    pumpAllPeers( mgr );
3569
3570    /* allocate bandwidth to the peers */
3571    tr_bandwidthAllocate( &mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3572    tr_bandwidthAllocate( &mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3573
3574    /* torrent upkeep */
3575    tor = NULL;
3576    while(( tor = tr_torrentNext( mgr->session, tor )))
3577    {
3578        /* possibly stop torrents that have seeded enough */
3579        tr_torrentCheckSeedLimit( tor );
3580
3581        /* run the completeness check for any torrents that need it */
3582        if( tor->torrentPeers->needsCompletenessCheck ) {
3583            tor->torrentPeers->needsCompletenessCheck  = false;
3584            tr_torrentRecheckCompleteness( tor );
3585        }
3586
3587        /* stop torrents that are ready to stop, but couldn't be stopped
3588           earlier during the peer-io callback call chain */
3589        if( tor->isStopping )
3590            tr_torrentStop( tor );
3591    }
3592
3593    reconnectPulse( 0, 0, mgr );
3594
3595    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3596    managerUnlock( mgr );
3597}
3598
3599/***
3600****
3601***/
3602
3603static int
3604compareAtomPtrsByAddress( const void * va, const void *vb )
3605{
3606    const struct peer_atom * a = * (const struct peer_atom**) va;
3607    const struct peer_atom * b = * (const struct peer_atom**) vb;
3608
3609    assert( tr_isAtom( a ) );
3610    assert( tr_isAtom( b ) );
3611
3612    return tr_address_compare( &a->addr, &b->addr );
3613}
3614
3615/* best come first, worst go last */
3616static int
3617compareAtomPtrsByShelfDate( const void * va, const void *vb )
3618{
3619    time_t atime;
3620    time_t btime;
3621    const struct peer_atom * a = * (const struct peer_atom**) va;
3622    const struct peer_atom * b = * (const struct peer_atom**) vb;
3623    const int data_time_cutoff_secs = 60 * 60;
3624    const time_t tr_now = tr_time( );
3625
3626    assert( tr_isAtom( a ) );
3627    assert( tr_isAtom( b ) );
3628
3629    /* primary key: the last piece data time *if* it was within the last hour */
3630    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3631    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3632    if( atime != btime )
3633        return atime > btime ? -1 : 1;
3634
3635    /* secondary key: shelf date. */
3636    if( a->shelf_date != b->shelf_date )
3637        return a->shelf_date > b->shelf_date ? -1 : 1;
3638
3639    return 0;
3640}
3641
3642static int
3643getMaxAtomCount( const tr_torrent * tor )
3644{
3645    const int n = tor->maxConnectedPeers;
3646    /* approximate fit of the old jump discontinuous function */
3647    if( n >= 55 ) return     n + 150;
3648    if( n >= 20 ) return 2 * n + 95;
3649    return               4 * n + 55;
3650}
3651
3652static void
3653atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3654{
3655    tr_torrent * tor = NULL;
3656    tr_peerMgr * mgr = vmgr;
3657    managerLock( mgr );
3658
3659    while(( tor = tr_torrentNext( mgr->session, tor )))
3660    {
3661        int atomCount;
3662        Torrent * t = tor->torrentPeers;
3663        const int maxAtomCount = getMaxAtomCount( tor );
3664        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3665
3666        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3667        {
3668            int i;
3669            int keepCount = 0;
3670            int testCount = 0;
3671            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3672            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3673
3674            /* keep the ones that are in use */
3675            for( i=0; i<atomCount; ++i ) {
3676                struct peer_atom * atom = atoms[i];
3677                if( peerIsInUse( t, atom ) )
3678                    keep[keepCount++] = atom;
3679                else
3680                    test[testCount++] = atom;
3681            }
3682
3683            /* if there's room, keep the best of what's left */
3684            i = 0;
3685            if( keepCount < maxAtomCount ) {
3686                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3687                while( i<testCount && keepCount<maxAtomCount )
3688                    keep[keepCount++] = test[i++];
3689            }
3690
3691            /* free the culled atoms */
3692            while( i<testCount )
3693                tr_free( test[i++] );
3694
3695            /* rebuild Torrent.pool with what's left */
3696            tr_ptrArrayDestruct( &t->pool, NULL );
3697            t->pool = TR_PTR_ARRAY_INIT;
3698            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3699            for( i=0; i<keepCount; ++i )
3700                tr_ptrArrayAppend( &t->pool, keep[i] );
3701
3702            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3703
3704            /* cleanup */
3705            tr_free( test );
3706            tr_free( keep );
3707        }
3708    }
3709
3710    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3711    managerUnlock( mgr );
3712}
3713
3714/***
3715****
3716****
3717****
3718***/
3719
3720/* is this atom someone that we'd want to initiate a connection to? */
3721static bool
3722isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3723{
3724    /* not if we're both seeds */
3725    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
3726        return false;
3727
3728    /* not if we've already got a connection to them... */
3729    if( peerIsInUse( tor->torrentPeers, atom ) )
3730        return false;
3731
3732    /* not if we just tried them already */
3733    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3734        return false;
3735
3736    /* not if they're blocklisted */
3737    if( isAtomBlocklisted( tor->session, atom ) )
3738        return false;
3739
3740    /* not if they're banned... */
3741    if( atom->flags2 & MYFLAG_BANNED )
3742        return false;
3743
3744    return true;
3745}
3746
3747struct peer_candidate
3748{
3749    uint64_t score;
3750    tr_torrent * tor;
3751    struct peer_atom * atom;
3752};
3753
3754static bool
3755torrentWasRecentlyStarted( const tr_torrent * tor )
3756{
3757    return difftime( tr_time( ), tor->startDate ) < 120;
3758}
3759
3760static inline uint64_t
3761addValToKey( uint64_t value, int width, uint64_t addme )
3762{
3763    value = (value << (uint64_t)width);
3764    value |= addme;
3765    return value;
3766}
3767
3768/* smaller value is better */
3769static uint64_t
3770getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3771{
3772    uint64_t i;
3773    uint64_t score = 0;
3774    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3775
3776    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3777    i = failed ? 1 : 0;
3778    score = addValToKey( score, 1, i );
3779
3780    /* prefer the one we attempted least recently (to cycle through all peers) */
3781    i = atom->lastConnectionAttemptAt;
3782    score = addValToKey( score, 32, i );
3783
3784    /* prefer peers belonging to a torrent of a higher priority */
3785    switch( tr_torrentGetPriority( tor ) ) {
3786        case TR_PRI_HIGH:    i = 0; break;
3787        case TR_PRI_NORMAL:  i = 1; break;
3788        case TR_PRI_LOW:     i = 2; break;
3789    }
3790    score = addValToKey( score, 4, i );
3791
3792    /* prefer recently-started torrents */
3793    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3794    score = addValToKey( score, 1, i );
3795
3796    /* prefer torrents we're downloading with */
3797    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3798    score = addValToKey( score, 1, i );
3799
3800    /* prefer peers that are known to be connectible */
3801    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3802    score = addValToKey( score, 1, i );
3803
3804    /* prefer peers that we might have a chance of uploading to...
3805       so lower seed probability is better */
3806    if( atom->seedProbability == 100 ) i = 101;
3807    else if( atom->seedProbability == -1 ) i = 100;
3808    else i = atom->seedProbability;
3809    score = addValToKey( score, 8, i );
3810
3811    /* Prefer peers that we got from more trusted sources.
3812     * lower `fromBest' values indicate more trusted sources */
3813    score = addValToKey( score, 4, atom->fromBest );
3814
3815    /* salt */
3816    score = addValToKey( score, 8, salt );
3817
3818    return score;
3819}
3820
3821/* sort an array of peer candidates */
3822static int
3823comparePeerCandidates( const void * va, const void * vb )
3824{
3825    const struct peer_candidate * a = va;
3826    const struct peer_candidate * b = vb;
3827
3828    if( a->score < b->score ) return -1;
3829    if( a->score > b->score ) return 1;
3830
3831    return 0;
3832}
3833
3834/** @return an array of all the atoms we might want to connect to */
3835static struct peer_candidate*
3836getPeerCandidates( tr_session * session, int * candidateCount )
3837{
3838    int atomCount;
3839    int peerCount;
3840    tr_torrent * tor;
3841    struct peer_candidate * candidates;
3842    struct peer_candidate * walk;
3843    const time_t now = tr_time( );
3844    const uint64_t now_msec = tr_time_msec( );
3845    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3846    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3847
3848    /* count how many peers and atoms we've got */
3849    tor= NULL;
3850    atomCount = 0;
3851    peerCount = 0;
3852    while(( tor = tr_torrentNext( session, tor ))) {
3853        atomCount += tr_ptrArraySize( &tor->torrentPeers->pool );
3854        peerCount += tr_ptrArraySize( &tor->torrentPeers->peers );
3855    }
3856
3857    /* don't start any new handshakes if we're full up */
3858    if( maxCandidates <= peerCount ) {
3859        *candidateCount = 0;
3860        return NULL;
3861    }
3862
3863    /* allocate an array of candidates */
3864    walk = candidates = tr_new( struct peer_candidate, atomCount );
3865
3866    /* populate the candidate array */
3867    tor = NULL;
3868    while(( tor = tr_torrentNext( session, tor )))
3869    {
3870        int i, nAtoms;
3871        struct peer_atom ** atoms;
3872
3873        if( !tor->torrentPeers->isRunning )
3874            continue;
3875
3876        /* if we've already got enough peers in this torrent... */
3877        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3878            continue;
3879
3880        /* if we've already got enough speed in this torrent... */
3881        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( &tor->bandwidth, now_msec, TR_UP ) )
3882            continue;
3883
3884        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3885        for( i=0; i<nAtoms; ++i )
3886        {
3887            struct peer_atom * atom = atoms[i];
3888
3889            if( isPeerCandidate( tor, atom, now ) )
3890            {
3891                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3892                walk->tor = tor;
3893                walk->atom = atom;
3894                walk->score = getPeerCandidateScore( tor, atom, salt );
3895                ++walk;
3896            }
3897        }
3898    }
3899
3900    *candidateCount = walk - candidates;
3901    if( *candidateCount > 1 )
3902        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3903    return candidates;
3904}
3905
3906static void
3907initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3908{
3909    tr_peerIo * io;
3910    const time_t now = tr_time( );
3911    bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3912
3913    if( atom->fromFirst == TR_PEER_FROM_PEX )
3914        /* PEX has explicit signalling for uTP support.  If an atom
3915           originally came from PEX and doesn't have the uTP flag, skip the
3916           uTP connection attempt.  Are we being optimistic here? */
3917        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3918
3919    tordbg( t, "Starting an OUTGOING%s connection with %s",
3920            utp ? " µTP" : "",
3921            tr_atomAddrStr( atom ) );
3922
3923    io = tr_peerIoNewOutgoing( mgr->session,
3924                               &mgr->session->bandwidth,
3925                               &atom->addr,
3926                               atom->port,
3927                               t->tor->info.hash,
3928                               t->tor->completeness == TR_SEED,
3929                               utp );
3930
3931    if( io == NULL )
3932    {
3933        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3934                tr_atomAddrStr( atom ) );
3935        atom->flags2 |= MYFLAG_UNREACHABLE;
3936        atom->numFails++;
3937    }
3938    else
3939    {
3940        tr_handshake * handshake = tr_handshakeNew( io,
3941                                                    mgr->session->encryptionMode,
3942                                                    myHandshakeDoneCB,
3943                                                    mgr );
3944
3945        assert( tr_peerIoGetTorrentHash( io ) );
3946
3947        tr_peerIoUnref( io ); /* balanced by the initial ref
3948                                 in tr_peerIoNewOutgoing() */
3949
3950        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3951                                 handshakeCompare );
3952    }
3953
3954    atom->lastConnectionAttemptAt = now;
3955    atom->time = now;
3956}
3957
3958static void
3959initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3960{
3961#if 0
3962    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3963             tr_atomAddrStr( c->atom ),
3964             tr_torrentName( c->tor ),
3965             (int)c->atom->seedProbability,
3966             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3967             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3968#endif
3969
3970    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3971}
3972
3973static void
3974makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3975{
3976    int i, n;
3977    struct peer_candidate * candidates;
3978
3979    candidates = getPeerCandidates( mgr->session, &n );
3980
3981    for( i=0; i<n && i<max; ++i )
3982        initiateCandidateConnection( mgr, &candidates[i] );
3983
3984    tr_free( candidates );
3985}
Note: See TracBrowser for help on using the repository browser.