source: trunk/libtransmission/peer-mgr.c @ 11948

Last change on this file since 11948 was 11948, checked in by jch, 11 years ago

When uTP is enabled, open uTP connections to some peers.

Since we don't implement falling back to TCP yet, we're very
conservative: we only use uTP when we have good reasons to believe
the peer speaks uTP.

  • Property svn:keywords set to Date Rev Author Id
File size: 114.1 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 11948 2011-02-18 00:43:24Z jch $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "session.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to cull old atoms */
46    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
47
48    /* how frequently to change which peers are choked */
49    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
50
51    /* an optimistically unchoked peer is immune from rechoking
52       for this many calls to rechokeUploads(). */
53    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
54
55    /* how frequently to reallocate bandwidth */
56    BANDWIDTH_PERIOD_MSEC = 500,
57
58    /* how frequently to age out old piece request lists */
59    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
60
61    /* how frequently to decide which peers live and die */
62    RECONNECT_PERIOD_MSEC = 500,
63
64    /* when many peers are available, keep idle ones this long */
65    MIN_UPLOAD_IDLE_SECS = ( 60 ),
66
67    /* when few peers are available, keep idle ones this long */
68    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
69
70    /* max number of peers to ask for per second overall.
71     * this throttle is to avoid overloading the router */
72    MAX_CONNECTIONS_PER_SECOND = 12,
73
74    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
75
76    /* number of bad pieces a peer is allowed to send before we ban them */
77    MAX_BAD_PIECES_PER_PEER = 5,
78
79    /* amount of time to keep a list of request pieces lying around
80       before it's considered too old and needs to be rebuilt */
81    PIECE_LIST_SHELF_LIFE_SECS = 60,
82
83    /* use for bitwise operations w/peer_atom.flags2 */
84    MYFLAG_BANNED = 1,
85
86    /* use for bitwise operations w/peer_atom.flags2 */
87    /* unreachable for now... but not banned.
88     * if they try to connect to us it's okay */
89    MYFLAG_UNREACHABLE = 2,
90
91    /* the minimum we'll wait before attempting to reconnect to a peer */
92    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
93
94    /** how long we'll let requests we've made linger before we cancel them */
95    REQUEST_TTL_SECS = 120,
96
97    NO_BLOCKS_CANCEL_HISTORY = 120,
98
99    CANCEL_HISTORY_SEC = 60
100};
101
102const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0.0f, 0, FALSE, 0 };
103
104/**
105***
106**/
107
108enum
109{
110    UPLOAD_ONLY_UKNOWN,
111    UPLOAD_ONLY_YES,
112    UPLOAD_ONLY_NO
113};
114
115/**
116 * Peer information that should be kept even before we've connected and
117 * after we've disconnected. These are kept in a pool of peer_atoms to decide
118 * which ones would make good candidates for connecting to, and to watch out
119 * for banned peers.
120 *
121 * @see tr_peer
122 * @see tr_peermsgs
123 */
124struct peer_atom
125{
126    uint8_t     fromFirst;          /* where the peer was first found */
127    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
128    uint8_t     flags;              /* these match the added_f flags */
129    uint8_t     flags2;             /* flags that aren't defined in added_f */
130    uint8_t     uploadOnly;         /* UPLOAD_ONLY_ */
131    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
132    int8_t      blocklisted;        /* -1 for unknown, TRUE for blocklisted, FALSE for not blocklisted */
133
134    tr_port     port;
135    uint16_t    numFails;
136    time_t      time;               /* when the peer's connection status last changed */
137    time_t      piece_data_time;
138
139    time_t      lastConnectionAttemptAt;
140    time_t      lastConnectionAt;
141
142    /* similar to a TTL field, but less rigid --
143     * if the swarm is small, the atom will be kept past this date. */
144    time_t      shelf_date;
145    tr_peer   * peer;               /* will be NULL if not connected */
146    tr_address  addr;
147};
148
149#ifdef NDEBUG
150#define tr_isAtom(a) (TRUE)
151#else
152static tr_bool
153tr_isAtom( const struct peer_atom * atom )
154{
155    return ( atom != NULL )
156        && ( atom->fromFirst < TR_PEER_FROM__MAX )
157        && ( atom->fromBest < TR_PEER_FROM__MAX )
158        && ( tr_isAddress( &atom->addr ) );
159}
160#endif
161
162static const char*
163tr_atomAddrStr( const struct peer_atom * atom )
164{
165    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
166}
167
168struct block_request
169{
170    tr_block_index_t block;
171    tr_peer * peer;
172    time_t sentAt;
173};
174
175struct weighted_piece
176{
177    tr_piece_index_t index;
178    int16_t salt;
179    int16_t requestCount;
180};
181
182enum piece_sort_state
183{
184    PIECES_UNSORTED,
185    PIECES_SORTED_BY_INDEX,
186    PIECES_SORTED_BY_WEIGHT
187};
188
189/** @brief Opaque, per-torrent data structure for peer connection information */
190typedef struct tr_torrent_peers
191{
192    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
193    tr_ptrArray                pool; /* struct peer_atom */
194    tr_ptrArray                peers; /* tr_peer */
195    tr_ptrArray                webseeds; /* tr_webseed */
196
197    tr_torrent               * tor;
198    struct tr_peerMgr        * manager;
199
200    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
201    int                        optimisticUnchokeTimeScaler;
202
203    tr_bool                    isRunning;
204    tr_bool                    needsCompletenessCheck;
205
206    struct block_request     * requests;
207    int                        requestCount;
208    int                        requestAlloc;
209
210    struct weighted_piece    * pieces;
211    int                        pieceCount;
212    enum piece_sort_state      pieceSortState;
213
214    /* An array of pieceCount items stating how many peers have each piece.
215       This is used to help us for downloading pieces "rarest first."
216       This may be NULL if we don't have metainfo yet, or if we're not
217       downloading and don't care about rarity */
218    uint16_t                 * pieceReplication;
219    size_t                     pieceReplicationSize;
220
221    int                        interestedCount;
222    int                        maxPeers;
223    time_t                     lastCancel;
224
225    /* Before the endgame this should be 0. In endgame, is contains the average
226     * number of pending requests per peer. Only peers which have more pending
227     * requests are considered 'fast' are allowed to request a block that's
228     * already been requested from another (slower?) peer. */
229    int                        endgame;
230}
231Torrent;
232
233struct tr_peerMgr
234{
235    tr_session    * session;
236    tr_ptrArray     incomingHandshakes; /* tr_handshake */
237    struct event  * bandwidthTimer;
238    struct event  * rechokeTimer;
239    struct event  * refillUpkeepTimer;
240    struct event  * atomTimer;
241};
242
243#define tordbg( t, ... ) \
244    do { \
245        if( tr_deepLoggingIsActive( ) ) \
246            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
247    } while( 0 )
248
249#define dbgmsg( ... ) \
250    do { \
251        if( tr_deepLoggingIsActive( ) ) \
252            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
253    } while( 0 )
254
255/**
256***
257**/
258
259static inline void
260managerLock( const struct tr_peerMgr * manager )
261{
262    tr_sessionLock( manager->session );
263}
264
265static inline void
266managerUnlock( const struct tr_peerMgr * manager )
267{
268    tr_sessionUnlock( manager->session );
269}
270
271static inline void
272torrentLock( Torrent * torrent )
273{
274    managerLock( torrent->manager );
275}
276
277static inline void
278torrentUnlock( Torrent * torrent )
279{
280    managerUnlock( torrent->manager );
281}
282
283static inline int
284torrentIsLocked( const Torrent * t )
285{
286    return tr_sessionIsLocked( t->manager->session );
287}
288
289/**
290***
291**/
292
293static int
294handshakeCompareToAddr( const void * va, const void * vb )
295{
296    const tr_handshake * a = va;
297
298    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
299}
300
301static int
302handshakeCompare( const void * a, const void * b )
303{
304    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
305}
306
307static inline tr_handshake*
308getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
309{
310    if( tr_ptrArrayEmpty( handshakes ) )
311        return NULL;
312
313    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
314}
315
316static int
317comparePeerAtomToAddress( const void * va, const void * vb )
318{
319    const struct peer_atom * a = va;
320
321    return tr_compareAddresses( &a->addr, vb );
322}
323
324static int
325compareAtomsByAddress( const void * va, const void * vb )
326{
327    const struct peer_atom * b = vb;
328
329    assert( tr_isAtom( b ) );
330
331    return comparePeerAtomToAddress( va, &b->addr );
332}
333
334/**
335***
336**/
337
338const tr_address *
339tr_peerAddress( const tr_peer * peer )
340{
341    return &peer->atom->addr;
342}
343
344static Torrent*
345getExistingTorrent( tr_peerMgr *    manager,
346                    const uint8_t * hash )
347{
348    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
349
350    return tor == NULL ? NULL : tor->torrentPeers;
351}
352
353static int
354peerCompare( const void * a, const void * b )
355{
356    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
357}
358
359static struct peer_atom*
360getExistingAtom( const Torrent    * t,
361                 const tr_address * addr )
362{
363    Torrent * tt = (Torrent*)t;
364    assert( torrentIsLocked( t ) );
365    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
366}
367
368static tr_bool
369peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
370{
371    Torrent * t = (Torrent*) ct;
372
373    assert( torrentIsLocked ( t ) );
374
375    return ( atom->peer != NULL )
376        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
377        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
378}
379
380static tr_peer*
381peerConstructor( struct peer_atom * atom )
382{
383    tr_peer * peer = tr_new0( tr_peer, 1 );
384
385    tr_bitsetConstructor( &peer->have, 0 );
386
387    peer->atom = atom;
388    atom->peer = peer;
389
390    peer->blocksSentToClient  = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
391    peer->blocksSentToPeer    = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
392    peer->cancelsSentToClient = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
393    peer->cancelsSentToPeer   = tr_historyNew( CANCEL_HISTORY_SEC, ( REFILL_UPKEEP_PERIOD_MSEC / 1000 ) );
394
395    return peer;
396}
397
398static tr_peer*
399getPeer( Torrent * torrent, struct peer_atom * atom )
400{
401    tr_peer * peer;
402
403    assert( torrentIsLocked( torrent ) );
404
405    peer = atom->peer;
406
407    if( peer == NULL )
408    {
409        peer = peerConstructor( atom );
410        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
411    }
412
413    return peer;
414}
415
416static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
417
418static void
419peerDestructor( Torrent * t, tr_peer * peer )
420{
421    assert( peer != NULL );
422
423    peerDeclinedAllRequests( t, peer );
424
425    if( peer->msgs != NULL )
426        tr_peerMsgsFree( peer->msgs );
427
428    tr_peerIoClear( peer->io );
429    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
430
431    tr_historyFree( peer->blocksSentToClient  );
432    tr_historyFree( peer->blocksSentToPeer    );
433    tr_historyFree( peer->cancelsSentToClient );
434    tr_historyFree( peer->cancelsSentToPeer   );
435
436    tr_bitsetDestructor( &peer->have );
437    tr_bitfieldFree( peer->blame );
438    tr_free( peer->client );
439    peer->atom->peer = NULL;
440
441    tr_free( peer );
442}
443
444static tr_bool
445replicationExists( const Torrent * t )
446{
447    return t->pieceReplication != NULL;
448}
449
450static void
451replicationFree( Torrent * t )
452{
453    tr_free( t->pieceReplication );
454    t->pieceReplication = NULL;
455    t->pieceReplicationSize = 0;
456}
457
458static void
459replicationNew( Torrent * t )
460{
461    tr_piece_index_t piece_i;
462    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
463    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
464    const int peer_count = tr_ptrArraySize( &t->peers );
465
466    assert( !replicationExists( t ) );
467
468    t->pieceReplicationSize = piece_count;
469    t->pieceReplication = tr_new0( uint16_t, piece_count );
470
471    for( piece_i=0; piece_i<piece_count; ++piece_i )
472    {
473        int peer_i;
474        uint16_t r = 0;
475
476        for( peer_i=0; peer_i<peer_count; ++peer_i )
477            if( tr_bitsetHasFast( &peers[peer_i]->have, piece_i ) )
478                ++r;
479
480        t->pieceReplication[piece_i] = r;
481    }
482}
483
484static void
485torrentDestructor( void * vt )
486{
487    Torrent * t = vt;
488
489    assert( t );
490    assert( !t->isRunning );
491    assert( torrentIsLocked( t ) );
492    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
493    assert( tr_ptrArrayEmpty( &t->peers ) );
494
495    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
496    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
497    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
498    tr_ptrArrayDestruct( &t->peers, NULL );
499
500    replicationFree( t );
501
502    tr_free( t->requests );
503    tr_free( t->pieces );
504    tr_free( t );
505}
506
507static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
508
509static Torrent*
510torrentConstructor( tr_peerMgr * manager,
511                    tr_torrent * tor )
512{
513    int       i;
514    Torrent * t;
515
516    t = tr_new0( Torrent, 1 );
517    t->manager = manager;
518    t->tor = tor;
519    t->pool = TR_PTR_ARRAY_INIT;
520    t->peers = TR_PTR_ARRAY_INIT;
521    t->webseeds = TR_PTR_ARRAY_INIT;
522    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
523
524    for( i = 0; i < tor->info.webseedCount; ++i )
525    {
526        tr_webseed * w =
527            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
528        tr_ptrArrayAppend( &t->webseeds, w );
529    }
530
531    return t;
532}
533
534tr_peerMgr*
535tr_peerMgrNew( tr_session * session )
536{
537    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
538    m->session = session;
539    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
540    return m;
541}
542
543static void
544deleteTimer( struct event ** t )
545{
546    if( *t != NULL )
547    {
548        event_free( *t );
549        *t = NULL;
550    }
551}
552
553static void
554deleteTimers( struct tr_peerMgr * m )
555{
556    deleteTimer( &m->atomTimer );
557    deleteTimer( &m->bandwidthTimer );
558    deleteTimer( &m->rechokeTimer );
559    deleteTimer( &m->refillUpkeepTimer );
560}
561
562void
563tr_peerMgrFree( tr_peerMgr * manager )
564{
565    managerLock( manager );
566
567    deleteTimers( manager );
568
569    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
570     * the item from manager->handshakes, so this is a little roundabout... */
571    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
572        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
573
574    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
575
576    managerUnlock( manager );
577    tr_free( manager );
578}
579
580static int
581clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
582{
583    if( !tr_torrentHasMetadata( tor ) )
584        return TRUE;
585
586    return peer->clientIsInterested && !peer->clientIsChoked;
587}
588
589static int
590clientIsUploadingTo( const tr_peer * peer )
591{
592    return peer->peerIsInterested && !peer->peerIsChoked;
593}
594
595/***
596****
597***/
598
599void
600tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
601{
602    tr_torrent * tor = NULL;
603    tr_session * session = mgr->session;
604
605    /* we cache whether or not a peer is blocklisted...
606       since the blocklist has changed, erase that cached value */
607    while(( tor = tr_torrentNext( session, tor )))
608    {
609        int i;
610        Torrent * t = tor->torrentPeers;
611        const int n = tr_ptrArraySize( &t->pool );
612        for( i=0; i<n; ++i ) {
613            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
614            atom->blocklisted = -1;
615        }
616    }
617}
618
619static tr_bool
620isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
621{
622    if( atom->blocklisted < 0 )
623        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
624
625    assert( tr_isBool( atom->blocklisted ) );
626    return atom->blocklisted;
627}
628
629
630/***
631****
632***/
633
634static void
635atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
636{
637    assert( atom != NULL );
638    assert( -1<=seedProbability && seedProbability<=100 );
639
640    atom->seedProbability = seedProbability;
641
642    if( seedProbability == 100 )
643        atom->flags |= ADDED_F_SEED_FLAG;
644    else if( seedProbability != -1 )
645        atom->flags &= ~ADDED_F_SEED_FLAG;
646}
647
648static void
649atomSetSeed( struct peer_atom * atom )
650{
651    atomSetSeedProbability( atom, 100 );
652}
653
654static inline tr_bool
655atomIsSeed( const struct peer_atom * atom )
656{
657    return atom->seedProbability == 100;
658}
659
660tr_bool
661tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
662                      const tr_address  * addr )
663{
664    tr_bool isSeed = FALSE;
665    const Torrent * t = tor->torrentPeers;
666    const struct peer_atom * atom = getExistingAtom( t, addr );
667
668    if( atom )
669        isSeed = atomIsSeed( atom );
670
671    return isSeed;
672}
673
674void
675tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
676{
677    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
678
679    if( atom )
680        atom->flags |= ADDED_F_UTP_FLAGS;
681}
682
683
684/**
685***  REQUESTS
686***
687*** There are two data structures associated with managing block requests:
688***
689*** 1. Torrent::requests, an array of "struct block_request" which keeps
690***    track of which blocks have been requested, and when, and by which peers.
691***    This is list is used for (a) cancelling requests that have been pending
692***    for too long and (b) avoiding duplicate requests before endgame.
693***
694*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
695***    pieces that we want to request. It's used to decide which blocks to
696***    return next when tr_peerMgrGetBlockRequests() is called.
697**/
698
699/**
700*** struct block_request
701**/
702
703static int
704compareReqByBlock( const void * va, const void * vb )
705{
706    const struct block_request * a = va;
707    const struct block_request * b = vb;
708
709    /* primary key: block */
710    if( a->block < b->block ) return -1;
711    if( a->block > b->block ) return 1;
712
713    /* secondary key: peer */
714    if( a->peer < b->peer ) return -1;
715    if( a->peer > b->peer ) return 1;
716
717    return 0;
718}
719
720static void
721requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
722{
723    struct block_request key;
724
725    /* ensure enough room is available... */
726    if( t->requestCount + 1 >= t->requestAlloc )
727    {
728        const int CHUNK_SIZE = 128;
729        t->requestAlloc += CHUNK_SIZE;
730        t->requests = tr_renew( struct block_request,
731                                t->requests, t->requestAlloc );
732    }
733
734    /* populate the record we're inserting */
735    key.block = block;
736    key.peer = peer;
737    key.sentAt = tr_time( );
738
739    /* insert the request to our array... */
740    {
741        tr_bool exact;
742        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
743                                       sizeof( struct block_request ),
744                                       compareReqByBlock, &exact );
745        assert( !exact );
746        memmove( t->requests + pos + 1,
747                 t->requests + pos,
748                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
749        t->requests[pos] = key;
750    }
751
752    if( peer != NULL )
753    {
754        ++peer->pendingReqsToPeer;
755        assert( peer->pendingReqsToPeer >= 0 );
756    }
757
758    /*fprintf( stderr, "added request of block %lu from peer %s... "
759                       "there are now %d block\n",
760                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
761}
762
763static struct block_request *
764requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
765{
766    struct block_request key;
767    key.block = block;
768    key.peer = (tr_peer*) peer;
769
770    return bsearch( &key, t->requests, t->requestCount,
771                    sizeof( struct block_request ),
772                    compareReqByBlock );
773}
774
775/**
776 * Find the peers are we currently requesting the block
777 * with index @a block from and append them to @a peerArr.
778 */
779static void
780getBlockRequestPeers( Torrent * t, tr_block_index_t block,
781                      tr_ptrArray * peerArr )
782{
783    tr_bool exact;
784    int i, pos;
785    struct block_request key;
786
787    key.block = block;
788    key.peer = NULL;
789    pos = tr_lowerBound( &key, t->requests, t->requestCount,
790                         sizeof( struct block_request ),
791                         compareReqByBlock, &exact );
792
793    assert( !exact ); /* shouldn't have a request with .peer == NULL */
794
795    for( i = pos; i < t->requestCount; ++i )
796    {
797        if( t->requests[i].block != block )
798            break;
799        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
800    }
801}
802
803static void
804decrementPendingReqCount( const struct block_request * b )
805{
806    if( b->peer != NULL )
807        if( b->peer->pendingReqsToPeer > 0 )
808            --b->peer->pendingReqsToPeer;
809}
810
811static void
812requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
813{
814    const struct block_request * b = requestListLookup( t, block, peer );
815    if( b != NULL )
816    {
817        const int pos = b - t->requests;
818        assert( pos < t->requestCount );
819
820        decrementPendingReqCount( b );
821
822        tr_removeElementFromArray( t->requests,
823                                   pos,
824                                   sizeof( struct block_request ),
825                                   t->requestCount-- );
826
827        /*fprintf( stderr, "removing request of block %lu from peer %s... "
828                           "there are now %d block requests left\n",
829                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
830    }
831}
832
833static int
834countActiveWebseeds( const Torrent * t )
835{
836    int activeCount = 0;
837    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
838    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
839
840    for( ; w!=wend; ++w )
841        if( tr_webseedIsActive( *w ) )
842            ++activeCount;
843
844    return activeCount;
845}
846
847static void
848updateEndgame( Torrent * t )
849{
850    const tr_torrent * tor = t->tor;
851    const tr_block_index_t missing = tr_cpBlocksMissing( &tor->completion );
852
853    assert( t->requestCount >= 0 );
854
855    if( (tr_block_index_t) t->requestCount < missing )
856    {
857        /* not in endgame */
858        t->endgame = 0;
859    }
860    else if( !t->endgame ) /* only recalculate when endgame first begins */
861    {
862        int numDownloading = 0;
863        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
864        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
865
866        /* add the active bittorrent peers... */
867        for( ; p!=pend; ++p )
868            if( (*p)->pendingReqsToPeer > 0 )
869                ++numDownloading;
870
871        /* add the active webseeds... */
872        numDownloading += countActiveWebseeds( t );
873
874        /* average number of pending requests per downloading peer */
875        t->endgame = t->requestCount / MAX( numDownloading, 1 );
876    }
877}
878
879
880/****
881*****
882*****  Piece List Manipulation / Accessors
883*****
884****/
885
886static inline void
887invalidatePieceSorting( Torrent * t )
888{
889    t->pieceSortState = PIECES_UNSORTED;
890}
891
892const tr_torrent * weightTorrent;
893
894const uint16_t * weightReplication;
895
896static void
897setComparePieceByWeightTorrent( Torrent * t )
898{
899    if( !replicationExists( t ) )
900        replicationNew( t );
901
902    weightTorrent = t->tor;
903    weightReplication = t->pieceReplication;
904}
905
906/* we try to create a "weight" s.t. high-priority pieces come before others,
907 * and that partially-complete pieces come before empty ones. */
908static int
909comparePieceByWeight( const void * va, const void * vb )
910{
911    const struct weighted_piece * a = va;
912    const struct weighted_piece * b = vb;
913    int ia, ib, missing, pending;
914    const tr_torrent * tor = weightTorrent;
915    const uint16_t * rep = weightReplication;
916
917    /* primary key: weight */
918    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
919    pending = a->requestCount;
920    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
921    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
922    pending = b->requestCount;
923    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
924    if( ia < ib ) return -1;
925    if( ia > ib ) return 1;
926
927    /* secondary key: higher priorities go first */
928    ia = tor->info.pieces[a->index].priority;
929    ib = tor->info.pieces[b->index].priority;
930    if( ia > ib ) return -1;
931    if( ia < ib ) return 1;
932
933    /* tertiary key: rarest first. */
934    ia = rep[a->index];
935    ib = rep[b->index];
936    if( ia < ib ) return -1;
937    if( ia > ib ) return 1;
938
939    /* quaternary key: random */
940    if( a->salt < b->salt ) return -1;
941    if( a->salt > b->salt ) return 1;
942
943    /* okay, they're equal */
944    return 0;
945}
946
947static int
948comparePieceByIndex( const void * va, const void * vb )
949{
950    const struct weighted_piece * a = va;
951    const struct weighted_piece * b = vb;
952    if( a->index < b->index ) return -1;
953    if( a->index > b->index ) return 1;
954    return 0;
955}
956
957static void
958pieceListSort( Torrent * t, enum piece_sort_state state )
959{
960    assert( state==PIECES_SORTED_BY_INDEX
961         || state==PIECES_SORTED_BY_WEIGHT );
962
963
964    if( state == PIECES_SORTED_BY_WEIGHT )
965    {
966        setComparePieceByWeightTorrent( t );
967        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
968    }
969    else
970        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
971
972    t->pieceSortState = state;
973}
974
975/**
976 * These functions are useful for testing, but too expensive for nightly builds.
977 * let's leave it disabled but add an easy hook to compile it back in
978 */
979#if 1
980#define assertWeightedPiecesAreSorted(t)
981#define assertReplicationCountIsExact(t)
982#else
983static void
984assertWeightedPiecesAreSorted( Torrent * t )
985{
986    if( !t->endgame )
987    {
988        int i;
989        setComparePieceByWeightTorrent( t );
990        for( i=0; i<t->pieceCount-1; ++i )
991            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
992    }
993}
994static void
995assertReplicationCountIsExact( Torrent * t )
996{
997    /* This assert might fail due to errors of implementations in other
998     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
999     * from a client. If a such a behavior is noticed,
1000     * a bug report should be filled to the faulty client. */
1001
1002    size_t piece_i;
1003    const uint16_t * rep = t->pieceReplication;
1004    const size_t piece_count = t->pieceReplicationSize;
1005    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1006    const int peer_count = tr_ptrArraySize( &t->peers );
1007
1008    assert( piece_count == t->tor->info.pieceCount );
1009
1010    for( piece_i=0; piece_i<piece_count; ++piece_i )
1011    {
1012        int peer_i;
1013        uint16_t r = 0;
1014
1015        for( peer_i=0; peer_i<peer_count; ++peer_i )
1016            if( tr_bitsetHasFast( &peers[peer_i]->have, piece_i ) )
1017                ++r;
1018
1019        assert( rep[piece_i] == r );
1020    }
1021}
1022#endif
1023
1024static struct weighted_piece *
1025pieceListLookup( Torrent * t, tr_piece_index_t index )
1026{
1027    int i;
1028
1029    for( i=0; i<t->pieceCount; ++i )
1030        if( t->pieces[i].index == index )
1031            return &t->pieces[i];
1032
1033    return NULL;
1034}
1035
1036static void
1037pieceListRebuild( Torrent * t )
1038{
1039
1040    if( !tr_torrentIsSeed( t->tor ) )
1041    {
1042        tr_piece_index_t i;
1043        tr_piece_index_t * pool;
1044        tr_piece_index_t poolCount = 0;
1045        const tr_torrent * tor = t->tor;
1046        const tr_info * inf = tr_torrentInfo( tor );
1047        struct weighted_piece * pieces;
1048        int pieceCount;
1049
1050        /* build the new list */
1051        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1052        for( i=0; i<inf->pieceCount; ++i )
1053            if( !inf->pieces[i].dnd )
1054                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1055                    pool[poolCount++] = i;
1056        pieceCount = poolCount;
1057        pieces = tr_new0( struct weighted_piece, pieceCount );
1058        for( i=0; i<poolCount; ++i ) {
1059            struct weighted_piece * piece = pieces + i;
1060            piece->index = pool[i];
1061            piece->requestCount = 0;
1062            piece->salt = tr_cryptoWeakRandInt( 4096 );
1063        }
1064
1065        /* if we already had a list of pieces, merge it into
1066         * the new list so we don't lose its requestCounts */
1067        if( t->pieces != NULL )
1068        {
1069            struct weighted_piece * o = t->pieces;
1070            struct weighted_piece * oend = o + t->pieceCount;
1071            struct weighted_piece * n = pieces;
1072            struct weighted_piece * nend = n + pieceCount;
1073
1074            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1075
1076            while( o!=oend && n!=nend ) {
1077                if( o->index < n->index )
1078                    ++o;
1079                else if( o->index > n->index )
1080                    ++n;
1081                else
1082                    *n++ = *o++;
1083            }
1084
1085            tr_free( t->pieces );
1086        }
1087
1088        t->pieces = pieces;
1089        t->pieceCount = pieceCount;
1090
1091        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1092
1093        /* cleanup */
1094        tr_free( pool );
1095    }
1096}
1097
1098static void
1099pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1100{
1101    struct weighted_piece * p;
1102
1103    if(( p = pieceListLookup( t, piece )))
1104    {
1105        const int pos = p - t->pieces;
1106
1107        tr_removeElementFromArray( t->pieces,
1108                                   pos,
1109                                   sizeof( struct weighted_piece ),
1110                                   t->pieceCount-- );
1111
1112        if( t->pieceCount == 0 )
1113        {
1114            tr_free( t->pieces );
1115            t->pieces = NULL;
1116        }
1117    }
1118}
1119
1120static void
1121pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1122{
1123    int pos;
1124    tr_bool isSorted = TRUE;
1125
1126    if( p == NULL )
1127        return;
1128
1129    /* is the torrent already sorted? */
1130    pos = p - t->pieces;
1131    setComparePieceByWeightTorrent( t );
1132    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1133        isSorted = FALSE;
1134    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1135        isSorted = FALSE;
1136
1137    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1138    {
1139       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1140       isSorted = TRUE;
1141    }
1142
1143    /* if it's not sorted, move it around */
1144    if( !isSorted )
1145    {
1146        tr_bool exact;
1147        const struct weighted_piece tmp = *p;
1148
1149        tr_removeElementFromArray( t->pieces,
1150                                   pos,
1151                                   sizeof( struct weighted_piece ),
1152                                   t->pieceCount-- );
1153
1154        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1155                             sizeof( struct weighted_piece ),
1156                             comparePieceByWeight, &exact );
1157
1158        memmove( &t->pieces[pos + 1],
1159                 &t->pieces[pos],
1160                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1161
1162        t->pieces[pos] = tmp;
1163    }
1164
1165    assertWeightedPiecesAreSorted( t );
1166}
1167
1168static void
1169pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1170{
1171    struct weighted_piece * p;
1172    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1173
1174    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1175    {
1176        --p->requestCount;
1177        pieceListResortPiece( t, p );
1178    }
1179}
1180
1181
1182/****
1183*****
1184*****  Replication count ( for rarest first policy )
1185*****
1186****/
1187
1188/**
1189 * Increase the replication count of this piece and sort it if the
1190 * piece list is already sorted
1191 */
1192static void
1193tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1194{
1195    assert( replicationExists( t ) );
1196    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1197
1198    /* One more replication of this piece is present in the swarm */
1199    ++t->pieceReplication[index];
1200
1201    /* we only resort the piece if the list is already sorted */
1202    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1203        pieceListResortPiece( t, pieceListLookup( t, index ) );
1204}
1205
1206/**
1207 * Increases the replication count of pieces present in the bitfield
1208 */
1209static void
1210tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1211{
1212    size_t i;
1213    uint16_t * rep = t->pieceReplication;
1214    const size_t n = t->tor->info.pieceCount;
1215
1216    assert( replicationExists( t ) );
1217    assert( n == t->pieceReplicationSize );
1218    assert( tr_bitfieldTestFast( b, n-1 ) );
1219
1220    for( i=0; i<n; ++i )
1221        if( tr_bitfieldHas( b, i ) )
1222            ++rep[i];
1223
1224    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1225        invalidatePieceSorting( t );
1226}
1227
1228/**
1229 * Increase the replication count of every piece
1230 */
1231static void
1232tr_incrReplication( Torrent * t )
1233{
1234    int i;
1235    const int n = t->pieceReplicationSize;
1236
1237    assert( replicationExists( t ) );
1238    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1239
1240    for( i=0; i<n; ++i )
1241        ++t->pieceReplication[i];
1242}
1243
1244/**
1245 * Decrease the replication count of pieces present in the bitset.
1246 */
1247static void
1248tr_decrReplicationFromBitset( Torrent * t, const tr_bitset * bitset )
1249{
1250    int i;
1251    const int n = t->pieceReplicationSize;
1252
1253    assert( replicationExists( t ) );
1254    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1255
1256    if( bitset->haveAll )
1257    {
1258        for( i=0; i<n; ++i )
1259            --t->pieceReplication[i];
1260    }
1261    else if ( !bitset->haveNone )
1262    {
1263        const tr_bitfield * const b = &bitset->bitfield;
1264
1265        for( i=0; i<n; ++i )
1266            if( tr_bitfieldHas( b, i ) )
1267                --t->pieceReplication[i];
1268
1269        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1270            invalidatePieceSorting( t );
1271    }
1272}
1273
1274/**
1275***
1276**/
1277
1278void
1279tr_peerMgrRebuildRequests( tr_torrent * tor )
1280{
1281    assert( tr_isTorrent( tor ) );
1282
1283    pieceListRebuild( tor->torrentPeers );
1284}
1285
1286void
1287tr_peerMgrGetNextRequests( tr_torrent           * tor,
1288                           tr_peer              * peer,
1289                           int                    numwant,
1290                           tr_block_index_t     * setme,
1291                           int                  * numgot )
1292{
1293    int i;
1294    int got;
1295    Torrent * t;
1296    struct weighted_piece * pieces;
1297    const tr_bitset * have = &peer->have;
1298
1299    /* sanity clause */
1300    assert( tr_isTorrent( tor ) );
1301    assert( peer->clientIsInterested );
1302    assert( !peer->clientIsChoked );
1303    assert( numwant > 0 );
1304
1305    /* walk through the pieces and find blocks that should be requested */
1306    got = 0;
1307    t = tor->torrentPeers;
1308
1309    /* prep the pieces list */
1310    if( t->pieces == NULL )
1311        pieceListRebuild( t );
1312
1313    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1314        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1315
1316    assertReplicationCountIsExact( t );
1317    assertWeightedPiecesAreSorted( t );
1318
1319    updateEndgame( t );
1320    pieces = t->pieces;
1321    for( i=0; i<t->pieceCount && got<numwant; ++i )
1322    {
1323        struct weighted_piece * p = pieces + i;
1324
1325        /* if the peer has this piece that we want... */
1326        if( tr_bitsetHasFast( have, p->index ) )
1327        {
1328            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1329            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1330            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1331
1332            for( ; b!=e && got<numwant; ++b )
1333            {
1334                int peerCount;
1335                tr_peer ** peers;
1336
1337                /* don't request blocks we've already got */
1338                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1339                    continue;
1340
1341                /* always add peer if this block has no peers yet */
1342                tr_ptrArrayClear( &peerArr );
1343                getBlockRequestPeers( t, b, &peerArr );
1344                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1345                if( peerCount != 0 )
1346                {
1347                    /* don't make a second block request until the endgame */
1348                    if( !t->endgame )
1349                        continue;
1350
1351                    /* don't have more than two peers requesting this block */
1352                    if( peerCount > 1 )
1353                        continue;
1354
1355                    /* don't send the same request to the same peer twice */
1356                    if( peer == peers[0] )
1357                        continue;
1358
1359                    /* in the endgame allow an additional peer to download a
1360                       block but only if the peer seems to be handling requests
1361                       relatively fast */
1362                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1363                        continue;
1364                }
1365
1366                /* update the caller's table */
1367                setme[got++] = b;
1368
1369                /* update our own tables */
1370                requestListAdd( t, b, peer );
1371                ++p->requestCount;
1372            }
1373
1374            tr_ptrArrayDestruct( &peerArr, NULL );
1375        }
1376    }
1377
1378    /* In most cases we've just changed the weights of a small number of pieces.
1379     * So rather than qsort()ing the entire array, it's faster to apply an
1380     * adaptive insertion sort algorithm. */
1381    if( got > 0 )
1382    {
1383        /* not enough requests || last piece modified */
1384        if ( i == t->pieceCount ) --i;
1385
1386        setComparePieceByWeightTorrent( t );
1387        while( --i >= 0 )
1388        {
1389            tr_bool exact;
1390
1391            /* relative position! */
1392            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1393                                              t->pieceCount - (i + 1),
1394                                              sizeof( struct weighted_piece ),
1395                                              comparePieceByWeight, &exact );
1396            if( newpos > 0 )
1397            {
1398                const struct weighted_piece piece = t->pieces[i];
1399                memmove( &t->pieces[i],
1400                         &t->pieces[i + 1],
1401                         sizeof( struct weighted_piece ) * ( newpos ) );
1402                t->pieces[i + newpos] = piece;
1403            }
1404        }
1405    }
1406
1407    assertWeightedPiecesAreSorted( t );
1408    *numgot = got;
1409}
1410
1411tr_bool
1412tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1413                          const tr_peer     * peer,
1414                          tr_block_index_t    block )
1415{
1416    const Torrent * t = tor->torrentPeers;
1417    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1418}
1419
1420/* cancel requests that are too old */
1421static void
1422refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1423{
1424    time_t now;
1425    time_t too_old;
1426    tr_torrent * tor;
1427    tr_peerMgr * mgr = vmgr;
1428    managerLock( mgr );
1429
1430    now = tr_time( );
1431    too_old = now - REQUEST_TTL_SECS;
1432
1433    tor = NULL;
1434    while(( tor = tr_torrentNext( mgr->session, tor )))
1435    {
1436        Torrent * t = tor->torrentPeers;
1437        const int n = t->requestCount;
1438        if( n > 0 )
1439        {
1440            int keepCount = 0;
1441            int cancelCount = 0;
1442            struct block_request * cancel = tr_new( struct block_request, n );
1443            const struct block_request * it;
1444            const struct block_request * end;
1445
1446            for( it=t->requests, end=it+n; it!=end; ++it )
1447            {
1448                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1449                    cancel[cancelCount++] = *it;
1450                else
1451                {
1452                    if( it != &t->requests[keepCount] )
1453                        t->requests[keepCount] = *it;
1454                    keepCount++;
1455                }
1456            }
1457
1458            /* prune out the ones we aren't keeping */
1459            t->requestCount = keepCount;
1460
1461            /* send cancel messages for all the "cancel" ones */
1462            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1463                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1464                    tr_historyAdd( it->peer->cancelsSentToPeer, now, 1 );
1465                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1466                    decrementPendingReqCount( it );
1467                }
1468            }
1469
1470            /* decrement the pending request counts for the timed-out blocks */
1471            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1472                pieceListRemoveRequest( t, it->block );
1473
1474            /* cleanup loop */
1475            tr_free( cancel );
1476        }
1477    }
1478
1479    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1480    managerUnlock( mgr );
1481}
1482
1483static void
1484addStrike( Torrent * t, tr_peer * peer )
1485{
1486    tordbg( t, "increasing peer %s strike count to %d",
1487            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1488
1489    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1490    {
1491        struct peer_atom * atom = peer->atom;
1492        atom->flags2 |= MYFLAG_BANNED;
1493        peer->doPurge = 1;
1494        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1495    }
1496}
1497
1498static void
1499gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1500{
1501    tr_torrent *   tor = t->tor;
1502    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1503
1504    tor->corruptCur += byteCount;
1505    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1506
1507    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1508}
1509
1510static void
1511peerSuggestedPiece( Torrent            * t UNUSED,
1512                    tr_peer            * peer UNUSED,
1513                    tr_piece_index_t     pieceIndex UNUSED,
1514                    int                  isFastAllowed UNUSED )
1515{
1516#if 0
1517    assert( t );
1518    assert( peer );
1519    assert( peer->msgs );
1520
1521    /* is this a valid piece? */
1522    if(  pieceIndex >= t->tor->info.pieceCount )
1523        return;
1524
1525    /* don't ask for it if we've already got it */
1526    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1527        return;
1528
1529    /* don't ask for it if they don't have it */
1530    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1531        return;
1532
1533    /* don't ask for it if we're choked and it's not fast */
1534    if( !isFastAllowed && peer->clientIsChoked )
1535        return;
1536
1537    /* request the blocks that we don't have in this piece */
1538    {
1539        tr_block_index_t block;
1540        const tr_torrent * tor = t->tor;
1541        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1542        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1543
1544        for( block=start; block<end; ++block )
1545        {
1546            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1547            {
1548                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1549                const uint32_t length = tr_torBlockCountBytes( tor, block );
1550                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1551                incrementPieceRequests( t, pieceIndex );
1552            }
1553        }
1554    }
1555#endif
1556}
1557
1558static void
1559removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1560{
1561    requestListRemove( t, block, peer );
1562    pieceListRemoveRequest( t, block );
1563}
1564
1565/* peer choked us, or maybe it disconnected.
1566   either way we need to remove all its requests */
1567static void
1568peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1569{
1570    int i, n;
1571    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1572
1573    for( i=n=0; i<t->requestCount; ++i )
1574        if( peer == t->requests[i].peer )
1575            blocks[n++] = t->requests[i].block;
1576
1577    for( i=0; i<n; ++i )
1578        removeRequestFromTables( t, blocks[i], peer );
1579
1580    tr_free( blocks );
1581}
1582
1583static void
1584peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1585{
1586    Torrent * t = vt;
1587
1588    torrentLock( t );
1589
1590    switch( e->eventType )
1591    {
1592        case TR_PEER_PEER_GOT_DATA:
1593        {
1594            const time_t now = tr_time( );
1595            tr_torrent * tor = t->tor;
1596
1597            if( e->wasPieceData )
1598            {
1599                tor->uploadedCur += e->length;
1600                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1601                tr_torrentSetActivityDate( tor, now );
1602                tr_torrentSetDirty( tor );
1603            }
1604
1605            /* update the stats */
1606            if( e->wasPieceData )
1607                tr_statsAddUploaded( tor->session, e->length );
1608
1609            /* update our atom */
1610            if( peer && e->wasPieceData )
1611                peer->atom->piece_data_time = now;
1612
1613            break;
1614        }
1615
1616        case TR_PEER_CLIENT_GOT_HAVE:
1617            if( replicationExists( t ) ) {
1618                tr_incrReplicationOfPiece( t, e->pieceIndex );
1619                assertReplicationCountIsExact( t );
1620            }
1621            break;
1622
1623        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1624            if( replicationExists( t ) ) {
1625                tr_incrReplication( t );
1626                assertReplicationCountIsExact( t );
1627            }
1628            break;
1629
1630        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1631            /* noop */
1632            break;
1633
1634        case TR_PEER_CLIENT_GOT_BITFIELD:
1635            assert( e->bitfield != NULL );
1636            if( replicationExists( t ) ) {
1637                tr_incrReplicationFromBitfield( t, e->bitfield );
1638                assertReplicationCountIsExact( t );
1639            }
1640            break;
1641
1642        case TR_PEER_CLIENT_GOT_REJ:
1643            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1644            break;
1645
1646        case TR_PEER_CLIENT_GOT_CHOKE:
1647            peerDeclinedAllRequests( t, peer );
1648            break;
1649
1650        case TR_PEER_CLIENT_GOT_PORT:
1651            if( peer )
1652                peer->atom->port = e->port;
1653            break;
1654
1655        case TR_PEER_CLIENT_GOT_SUGGEST:
1656            if( peer )
1657                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1658            break;
1659
1660        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1661            if( peer )
1662                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1663            break;
1664
1665        case TR_PEER_CLIENT_GOT_DATA:
1666        {
1667            const time_t now = tr_time( );
1668            tr_torrent * tor = t->tor;
1669
1670            if( e->wasPieceData )
1671            {
1672                tor->downloadedCur += e->length;
1673                tr_torrentSetActivityDate( tor, now );
1674                tr_torrentSetDirty( tor );
1675            }
1676
1677            /* update the stats */
1678            if( e->wasPieceData )
1679                tr_statsAddDownloaded( tor->session, e->length );
1680
1681            /* update our atom */
1682            if( peer && peer->atom && e->wasPieceData )
1683                peer->atom->piece_data_time = now;
1684
1685            break;
1686        }
1687
1688        case TR_PEER_PEER_PROGRESS:
1689        {
1690            if( peer )
1691            {
1692                struct peer_atom * atom = peer->atom;
1693                if( e->progress >= 1.0 ) {
1694                    tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
1695                    atomSetSeed( atom );
1696                }
1697            }
1698            break;
1699        }
1700
1701        case TR_PEER_CLIENT_GOT_BLOCK:
1702        {
1703            tr_torrent * tor = t->tor;
1704            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1705            int i, peerCount;
1706            tr_peer ** peers;
1707            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1708
1709            removeRequestFromTables( t, block, peer );
1710            getBlockRequestPeers( t, block, &peerArr );
1711            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1712
1713            /* remove additional block requests and send cancel to peers */
1714            for( i=0; i<peerCount; i++ ) {
1715                tr_peer * p = peers[i];
1716                assert( p != peer );
1717                if( p->msgs ) {
1718                    tr_historyAdd( p->cancelsSentToPeer, tr_time( ), 1 );
1719                    tr_peerMsgsCancel( p->msgs, block );
1720                }
1721                removeRequestFromTables( t, block, p );
1722            }
1723
1724            tr_ptrArrayDestruct( &peerArr, FALSE );
1725
1726            if( peer && peer->blocksSentToClient )
1727                tr_historyAdd( peer->blocksSentToClient, tr_time( ), 1 );
1728
1729            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1730            {
1731                /* we already have this block... */
1732                const uint32_t n = tr_torBlockCountBytes( tor, block );
1733                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1734                tordbg( t, "we have this block already..." );
1735            }
1736            else
1737            {
1738                tr_cpBlockAdd( &tor->completion, block );
1739                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1740                tr_torrentSetDirty( tor );
1741
1742                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1743                {
1744                    const tr_piece_index_t p = e->pieceIndex;
1745                    const tr_bool ok = tr_torrentCheckPiece( tor, p );
1746
1747                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1748
1749                    if( !ok )
1750                    {
1751                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1752                                   (unsigned long)p );
1753                    }
1754
1755                    tr_peerMgrSetBlame( tor, p, ok );
1756
1757                    if( !ok )
1758                    {
1759                        gotBadPiece( t, p );
1760                    }
1761                    else
1762                    {
1763                        int i;
1764                        int peerCount;
1765                        tr_peer ** peers;
1766                        tr_file_index_t fileIndex;
1767
1768                        /* only add this to downloadedCur if we got it from a peer --
1769                         * webseeds shouldn't count against our ratio. As one tracker
1770                         * admin put it, "Those pieces are downloaded directly from the
1771                         * content distributor, not the peers, it is the tracker's job
1772                         * to manage the swarms, not the web server and does not fit
1773                         * into the jurisdiction of the tracker." */
1774                        if( peer->msgs != NULL ) {
1775                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1776                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1777                        }
1778
1779                        peerCount = tr_ptrArraySize( &t->peers );
1780                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1781                        for( i=0; i<peerCount; ++i )
1782                            tr_peerMsgsHave( peers[i]->msgs, p );
1783
1784                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1785                            const tr_file * file = &tor->info.files[fileIndex];
1786                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1787                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1788                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1789                                    tr_torrentFileCompleted( tor, fileIndex );
1790                                }
1791                            }
1792                        }
1793
1794                        pieceListRemovePiece( t, p );
1795                    }
1796                }
1797
1798                t->needsCompletenessCheck = TRUE;
1799            }
1800            break;
1801        }
1802
1803        case TR_PEER_ERROR:
1804            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1805            {
1806                /* some protocol error from the peer */
1807                peer->doPurge = 1;
1808                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1809                        tr_atomAddrStr( peer->atom ) );
1810            }
1811            else
1812            {
1813                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1814            }
1815            break;
1816
1817        default:
1818            assert( 0 );
1819    }
1820
1821    torrentUnlock( t );
1822}
1823
1824static int
1825getDefaultShelfLife( uint8_t from )
1826{
1827    /* in general, peers obtained from firsthand contact
1828     * are better than those from secondhand, etc etc */
1829    switch( from )
1830    {
1831        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1832        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1833        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1834        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1835        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1836        case TR_PEER_FROM_RESUME   : return 60 * 60;
1837        case TR_PEER_FROM_LPD      : return 10 * 60;
1838        default                    : return 60 * 60;
1839    }
1840}
1841
1842static void
1843ensureAtomExists( Torrent           * t,
1844                  const tr_address  * addr,
1845                  const tr_port       port,
1846                  const uint8_t       flags,
1847                  const int8_t        seedProbability,
1848                  const uint8_t       from )
1849{
1850    struct peer_atom * a;
1851
1852    assert( tr_isAddress( addr ) );
1853    assert( from < TR_PEER_FROM__MAX );
1854
1855    a = getExistingAtom( t, addr );
1856
1857    if( a == NULL )
1858    {
1859        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1860        a = tr_new0( struct peer_atom, 1 );
1861        a->addr = *addr;
1862        a->port = port;
1863        a->flags = flags;
1864        a->fromFirst = from;
1865        a->fromBest = from;
1866        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1867        a->blocklisted = -1;
1868        atomSetSeedProbability( a, seedProbability );
1869        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1870
1871        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1872    }
1873    else
1874    {
1875        if( from < a->fromBest )
1876            a->fromBest = from;
1877       
1878        if( a->seedProbability == -1 )
1879            atomSetSeedProbability( a, seedProbability );
1880
1881        a->flags |= flags;
1882    }
1883}
1884
1885static int
1886getMaxPeerCount( const tr_torrent * tor )
1887{
1888    return tor->maxConnectedPeers;
1889}
1890
1891static int
1892getPeerCount( const Torrent * t )
1893{
1894    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1895}
1896
1897/* FIXME: this is kind of a mess. */
1898static tr_bool
1899myHandshakeDoneCB( tr_handshake  * handshake,
1900                   tr_peerIo     * io,
1901                   tr_bool         readAnythingFromPeer,
1902                   tr_bool         isConnected,
1903                   const uint8_t * peer_id,
1904                   void          * vmanager )
1905{
1906    tr_bool            ok = isConnected;
1907    tr_bool            success = FALSE;
1908    tr_port            port;
1909    const tr_address * addr;
1910    tr_peerMgr       * manager = vmanager;
1911    Torrent          * t;
1912    tr_handshake     * ours;
1913
1914    assert( io );
1915    assert( tr_isBool( ok ) );
1916
1917    t = tr_peerIoHasTorrentHash( io )
1918        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1919        : NULL;
1920
1921    if( tr_peerIoIsIncoming ( io ) )
1922        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1923                                        handshake, handshakeCompare );
1924    else if( t )
1925        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1926                                        handshake, handshakeCompare );
1927    else
1928        ours = handshake;
1929
1930    assert( ours );
1931    assert( ours == handshake );
1932
1933    if( t )
1934        torrentLock( t );
1935
1936    addr = tr_peerIoGetAddress( io, &port );
1937
1938    if( !ok || !t || !t->isRunning )
1939    {
1940        if( t )
1941        {
1942            struct peer_atom * atom = getExistingAtom( t, addr );
1943            if( atom )
1944            {
1945                ++atom->numFails;
1946
1947                if( !readAnythingFromPeer )
1948                {
1949                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1950                    atom->flags2 |= MYFLAG_UNREACHABLE;
1951                }
1952            }
1953        }
1954    }
1955    else /* looking good */
1956    {
1957        struct peer_atom * atom;
1958
1959        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1960        atom = getExistingAtom( t, addr );
1961        atom->time = tr_time( );
1962        atom->piece_data_time = 0;
1963        atom->lastConnectionAt = tr_time( );
1964
1965        if( !tr_peerIoIsIncoming( io ) )
1966        {
1967            atom->flags |= ADDED_F_CONNECTABLE;
1968            atom->flags2 &= ~MYFLAG_UNREACHABLE;
1969        }
1970
1971        /* In principle, this flag specifies whether the peer groks uTP,
1972           not whether it's currently connected over uTP. */
1973        if( io->utp_socket )
1974            atom->flags |= ADDED_F_UTP_FLAGS;
1975
1976        if( atom->flags2 & MYFLAG_BANNED )
1977        {
1978            tordbg( t, "banned peer %s tried to reconnect",
1979                    tr_atomAddrStr( atom ) );
1980        }
1981        else if( tr_peerIoIsIncoming( io )
1982               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1983
1984        {
1985        }
1986        else
1987        {
1988            tr_peer * peer = atom->peer;
1989
1990            if( peer ) /* we already have this peer */
1991            {
1992            }
1993            else
1994            {
1995                peer = getPeer( t, atom );
1996                tr_free( peer->client );
1997
1998                if( !peer_id )
1999                    peer->client = NULL;
2000                else {
2001                    char client[128];
2002                    tr_clientForId( client, sizeof( client ), peer_id );
2003                    peer->client = tr_strdup( client );
2004                }
2005
2006                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2007                                                                balanced by our unref in peerDestructor()  */
2008                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
2009                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2010
2011                success = TRUE;
2012            }
2013        }
2014    }
2015
2016    if( t )
2017        torrentUnlock( t );
2018
2019    return success;
2020}
2021
2022void
2023tr_peerMgrAddIncoming( tr_peerMgr * manager,
2024                       tr_address * addr,
2025                       tr_port      port,
2026                       int          socket,
2027                       struct UTPSocket * utp_socket )
2028{
2029    tr_session * session;
2030
2031    managerLock( manager );
2032
2033    assert( tr_isSession( manager->session ) );
2034    session = manager->session;
2035
2036    if( tr_sessionIsAddressBlocked( session, addr ) )
2037    {
2038        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
2039        if(socket >= 0)
2040            tr_netClose( session, socket );
2041        else
2042            UTP_Close( utp_socket );
2043    }
2044    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2045    {
2046        if(socket >= 0)
2047            tr_netClose( session, socket );
2048        else
2049            UTP_Close( utp_socket );
2050    }
2051    else /* we don't have a connection to them yet... */
2052    {
2053        tr_peerIo *    io;
2054        tr_handshake * handshake;
2055
2056        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket, utp_socket );
2057
2058        handshake = tr_handshakeNew( io,
2059                                     session->encryptionMode,
2060                                     myHandshakeDoneCB,
2061                                     manager );
2062
2063        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2064
2065        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2066                                 handshakeCompare );
2067    }
2068
2069    managerUnlock( manager );
2070}
2071
2072void
2073tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2074                  const tr_pex * pex, int8_t seedProbability )
2075{
2076    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2077    {
2078        Torrent * t = tor->torrentPeers;
2079        managerLock( t->manager );
2080
2081        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2082            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
2083                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2084
2085        managerUnlock( t->manager );
2086    }
2087}
2088
2089void
2090tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2091{
2092    Torrent * t = tor->torrentPeers;
2093    const int n = tr_ptrArraySize( &t->pool );
2094    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2095    struct peer_atom ** end = it + n;
2096
2097    while( it != end )
2098        atomSetSeed( *it++ );
2099}
2100
2101tr_pex *
2102tr_peerMgrCompactToPex( const void *    compact,
2103                        size_t          compactLen,
2104                        const uint8_t * added_f,
2105                        size_t          added_f_len,
2106                        size_t *        pexCount )
2107{
2108    size_t          i;
2109    size_t          n = compactLen / 6;
2110    const uint8_t * walk = compact;
2111    tr_pex *        pex = tr_new0( tr_pex, n );
2112
2113    for( i = 0; i < n; ++i )
2114    {
2115        pex[i].addr.type = TR_AF_INET;
2116        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2117        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2118        if( added_f && ( n == added_f_len ) )
2119            pex[i].flags = added_f[i];
2120    }
2121
2122    *pexCount = n;
2123    return pex;
2124}
2125
2126tr_pex *
2127tr_peerMgrCompact6ToPex( const void    * compact,
2128                         size_t          compactLen,
2129                         const uint8_t * added_f,
2130                         size_t          added_f_len,
2131                         size_t        * pexCount )
2132{
2133    size_t          i;
2134    size_t          n = compactLen / 18;
2135    const uint8_t * walk = compact;
2136    tr_pex *        pex = tr_new0( tr_pex, n );
2137
2138    for( i = 0; i < n; ++i )
2139    {
2140        pex[i].addr.type = TR_AF_INET6;
2141        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2142        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2143        if( added_f && ( n == added_f_len ) )
2144            pex[i].flags = added_f[i];
2145    }
2146
2147    *pexCount = n;
2148    return pex;
2149}
2150
2151tr_pex *
2152tr_peerMgrArrayToPex( const void * array,
2153                      size_t       arrayLen,
2154                      size_t      * pexCount )
2155{
2156    size_t          i;
2157    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2158    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2159    const uint8_t * walk = array;
2160    tr_pex        * pex = tr_new0( tr_pex, n );
2161
2162    for( i = 0 ; i < n ; i++ ) {
2163        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2164        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2165        pex[i].flags = 0x00;
2166        walk += sizeof( tr_address ) + 2;
2167    }
2168
2169    *pexCount = n;
2170    return pex;
2171}
2172
2173/**
2174***
2175**/
2176
2177void
2178tr_peerMgrSetBlame( tr_torrent     * tor,
2179                    tr_piece_index_t pieceIndex,
2180                    int              success )
2181{
2182    if( !success )
2183    {
2184        int        peerCount, i;
2185        Torrent *  t = tor->torrentPeers;
2186        tr_peer ** peers;
2187
2188        assert( torrentIsLocked( t ) );
2189
2190        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2191        for( i = 0; i < peerCount; ++i )
2192        {
2193            tr_peer * peer = peers[i];
2194            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
2195            {
2196                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2197                        tr_atomAddrStr( peer->atom ),
2198                        pieceIndex, (int)peer->strikes + 1 );
2199                addStrike( t, peer );
2200            }
2201        }
2202    }
2203}
2204
2205int
2206tr_pexCompare( const void * va, const void * vb )
2207{
2208    const tr_pex * a = va;
2209    const tr_pex * b = vb;
2210    int i;
2211
2212    assert( tr_isPex( a ) );
2213    assert( tr_isPex( b ) );
2214
2215    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
2216        return i;
2217
2218    if( a->port != b->port )
2219        return a->port < b->port ? -1 : 1;
2220
2221    return 0;
2222}
2223
2224#if 0
2225static int
2226peerPrefersCrypto( const tr_peer * peer )
2227{
2228    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2229        return TRUE;
2230
2231    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2232        return FALSE;
2233
2234    return tr_peerIoIsEncrypted( peer->io );
2235}
2236#endif
2237
2238/* better goes first */
2239static int
2240compareAtomsByUsefulness( const void * va, const void *vb )
2241{
2242    const struct peer_atom * a = * (const struct peer_atom**) va;
2243    const struct peer_atom * b = * (const struct peer_atom**) vb;
2244
2245    assert( tr_isAtom( a ) );
2246    assert( tr_isAtom( b ) );
2247
2248    if( a->piece_data_time != b->piece_data_time )
2249        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2250    if( a->fromBest != b->fromBest )
2251        return a->fromBest < b->fromBest ? -1 : 1;
2252    if( a->numFails != b->numFails )
2253        return a->numFails < b->numFails ? -1 : 1;
2254
2255    return 0;
2256}
2257
2258int
2259tr_peerMgrGetPeers( tr_torrent   * tor,
2260                    tr_pex      ** setme_pex,
2261                    uint8_t        af,
2262                    uint8_t        list_mode,
2263                    int            maxCount )
2264{
2265    int i;
2266    int n;
2267    int count = 0;
2268    int atomCount = 0;
2269    const Torrent * t = tor->torrentPeers;
2270    struct peer_atom ** atoms = NULL;
2271    tr_pex * pex;
2272    tr_pex * walk;
2273
2274    assert( tr_isTorrent( tor ) );
2275    assert( setme_pex != NULL );
2276    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2277    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
2278
2279    managerLock( t->manager );
2280
2281    /**
2282    ***  build a list of atoms
2283    **/
2284
2285    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2286    {
2287        int i;
2288        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2289        atomCount = tr_ptrArraySize( &t->peers );
2290        atoms = tr_new( struct peer_atom *, atomCount );
2291        for( i=0; i<atomCount; ++i )
2292            atoms[i] = peers[i]->atom;
2293    }
2294    else /* TR_PEERS_ALL */
2295    {
2296        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
2297        atomCount = tr_ptrArraySize( &t->pool );
2298        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
2299    }
2300
2301    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2302
2303    /**
2304    ***  add the first N of them into our return list
2305    **/
2306
2307    n = MIN( atomCount, maxCount );
2308    pex = walk = tr_new0( tr_pex, n );
2309
2310    for( i=0; i<atomCount && count<n; ++i )
2311    {
2312        const struct peer_atom * atom = atoms[i];
2313        if( atom->addr.type == af )
2314        {
2315            assert( tr_isAddress( &atom->addr ) );
2316            walk->addr = atom->addr;
2317            walk->port = atom->port;
2318            walk->flags = atom->flags;
2319            ++count;
2320            ++walk;
2321        }
2322    }
2323
2324    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2325
2326    assert( ( walk - pex ) == count );
2327    *setme_pex = pex;
2328
2329    /* cleanup */
2330    tr_free( atoms );
2331    managerUnlock( t->manager );
2332    return count;
2333}
2334
2335static void atomPulse      ( int, short, void * );
2336static void bandwidthPulse ( int, short, void * );
2337static void rechokePulse   ( int, short, void * );
2338static void reconnectPulse ( int, short, void * );
2339
2340static struct event *
2341createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2342{
2343    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2344    tr_timerAddMsec( timer, msec );
2345    return timer;
2346}
2347
2348static void
2349ensureMgrTimersExist( struct tr_peerMgr * m )
2350{
2351    if( m->atomTimer == NULL )
2352        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2353
2354    if( m->bandwidthTimer == NULL )
2355        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2356
2357    if( m->rechokeTimer == NULL )
2358        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2359
2360    if( m->refillUpkeepTimer == NULL )
2361        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2362}
2363
2364void
2365tr_peerMgrStartTorrent( tr_torrent * tor )
2366{
2367    Torrent * t = tor->torrentPeers;
2368
2369    assert( tr_isTorrent( tor ) );
2370    assert( tr_torrentIsLocked( tor ) );
2371
2372    ensureMgrTimersExist( t->manager );
2373
2374    t->isRunning = TRUE;
2375    t->maxPeers = t->tor->maxConnectedPeers;
2376    t->pieceSortState = PIECES_UNSORTED;
2377
2378    rechokePulse( 0, 0, t->manager );
2379}
2380
2381static void
2382stopTorrent( Torrent * t )
2383{
2384    int i, n;
2385
2386    t->isRunning = FALSE;
2387
2388    replicationFree( t );
2389    invalidatePieceSorting( t );
2390
2391    /* disconnect the peers. */
2392    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
2393        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
2394    tr_ptrArrayClear( &t->peers );
2395
2396    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2397     * which removes the handshake from t->outgoingHandshakes... */
2398    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2399        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2400}
2401
2402void
2403tr_peerMgrStopTorrent( tr_torrent * tor )
2404{
2405    assert( tr_isTorrent( tor ) );
2406    assert( tr_torrentIsLocked( tor ) );
2407
2408    stopTorrent( tor->torrentPeers );
2409}
2410
2411void
2412tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2413{
2414    assert( tr_isTorrent( tor ) );
2415    assert( tr_torrentIsLocked( tor ) );
2416    assert( tor->torrentPeers == NULL );
2417
2418    tor->torrentPeers = torrentConstructor( manager, tor );
2419}
2420
2421void
2422tr_peerMgrRemoveTorrent( tr_torrent * tor )
2423{
2424    assert( tr_isTorrent( tor ) );
2425    assert( tr_torrentIsLocked( tor ) );
2426
2427    stopTorrent( tor->torrentPeers );
2428    torrentDestructor( tor->torrentPeers );
2429}
2430
2431void
2432tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2433{
2434    assert( tr_isTorrent( tor ) );
2435    assert( torrentIsLocked( tor->torrentPeers ) );
2436    assert( tab != NULL );
2437    assert( tabCount > 0 );
2438
2439    memset( tab, 0, tabCount );
2440
2441    if( tr_torrentHasMetadata( tor ) )
2442    {
2443        tr_piece_index_t i;
2444        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2445        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2446        const float interval = tor->info.pieceCount / (float)tabCount;
2447        const tr_bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2448
2449        for( i=0; i<tabCount; ++i )
2450        {
2451            const int piece = i * interval;
2452
2453            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2454                tab[i] = -1;
2455            else if( peerCount ) {
2456                int j;
2457                for( j=0; j<peerCount; ++j )
2458                    if( tr_bitsetHas( &peers[j]->have, piece ) )
2459                        ++tab[i];
2460            }
2461        }
2462    }
2463}
2464
2465/* Returns the pieces that are available from peers */
2466tr_bitfield*
2467tr_peerMgrGetAvailable( const tr_torrent * tor )
2468{
2469    int i;
2470    Torrent * t = tor->torrentPeers;
2471    const int peerCount = tr_ptrArraySize( &t->peers );
2472    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2473    tr_bitfield * pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2474
2475    assert( tr_torrentIsLocked( tor ) );
2476
2477    for( i=0; i<peerCount; ++i )
2478        tr_bitsetOr( pieces, &peers[i]->have );
2479
2480    return pieces;
2481}
2482
2483void
2484tr_peerMgrTorrentStats( tr_torrent  * tor,
2485                        int         * setmePeersKnown,
2486                        int         * setmePeersConnected,
2487                        int         * setmeSeedsConnected,
2488                        int         * setmeWebseedsSendingToUs,
2489                        int         * setmePeersSendingToUs,
2490                        int         * setmePeersGettingFromUs,
2491                        int         * setmePeersFrom )
2492{
2493    int i, size;
2494    const Torrent * t = tor->torrentPeers;
2495    const tr_peer ** peers;
2496
2497    assert( tr_torrentIsLocked( tor ) );
2498
2499    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2500    size = tr_ptrArraySize( &t->peers );
2501
2502    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2503    *setmePeersConnected       = 0;
2504    *setmeSeedsConnected       = 0;
2505    *setmePeersGettingFromUs   = 0;
2506    *setmePeersSendingToUs     = 0;
2507    *setmeWebseedsSendingToUs  = 0;
2508
2509    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2510        setmePeersFrom[i] = 0;
2511
2512    for( i=0; i<size; ++i )
2513    {
2514        const tr_peer * peer = peers[i];
2515        const struct peer_atom * atom = peer->atom;
2516
2517        if( peer->io == NULL ) /* not connected */
2518            continue;
2519
2520        ++*setmePeersConnected;
2521
2522        ++setmePeersFrom[atom->fromFirst];
2523
2524        if( clientIsDownloadingFrom( tor, peer ) )
2525            ++*setmePeersSendingToUs;
2526
2527        if( clientIsUploadingTo( peer ) )
2528            ++*setmePeersGettingFromUs;
2529
2530        if( atomIsSeed( atom ) )
2531            ++*setmeSeedsConnected;
2532    }
2533
2534    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2535}
2536
2537int
2538tr_peerMgrGetWebseedSpeed_Bps( const tr_torrent * tor, uint64_t now )
2539{
2540    int i;
2541    int tmp;
2542    int ret = 0;
2543
2544    const Torrent * t = tor->torrentPeers;
2545    const int n = tr_ptrArraySize( &t->webseeds );
2546    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2547
2548    for( i=0; i<n; ++i )
2549        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &tmp ) )
2550            ret += tmp;
2551
2552    return ret;
2553}
2554
2555
2556double*
2557tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2558{
2559    int i;
2560    const Torrent * t = tor->torrentPeers;
2561    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2562    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2563    const uint64_t now = tr_time_msec( );
2564    double * ret = tr_new0( double, webseedCount );
2565
2566    assert( tr_isTorrent( tor ) );
2567    assert( tr_torrentIsLocked( tor ) );
2568    assert( t->manager != NULL );
2569    assert( webseedCount == tor->info.webseedCount );
2570
2571    for( i=0; i<webseedCount; ++i ) {
2572        int Bps;
2573        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2574            ret[i] = Bps / (double)tr_speed_K;
2575        else
2576            ret[i] = -1.0;
2577    }
2578
2579    return ret;
2580}
2581
2582int
2583tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2584{
2585    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2586}
2587
2588
2589struct tr_peer_stat *
2590tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2591{
2592    int i;
2593    const Torrent * t = tor->torrentPeers;
2594    const int size = tr_ptrArraySize( &t->peers );
2595    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2596    const uint64_t now_msec = tr_time_msec( );
2597    const time_t now = tr_time();
2598    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2599
2600    assert( tr_isTorrent( tor ) );
2601    assert( tr_torrentIsLocked( tor ) );
2602    assert( t->manager );
2603
2604    for( i=0; i<size; ++i )
2605    {
2606        char *                   pch;
2607        const tr_peer *          peer = peers[i];
2608        const struct peer_atom * atom = peer->atom;
2609        tr_peer_stat *           stat = ret + i;
2610
2611        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2612        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2613                   sizeof( stat->client ) );
2614        stat->port                = ntohs( peer->atom->port );
2615        stat->from                = atom->fromFirst;
2616        stat->progress            = peer->progress;
2617        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2618        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2619        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2620        stat->peerIsChoked        = peer->peerIsChoked;
2621        stat->peerIsInterested    = peer->peerIsInterested;
2622        stat->clientIsChoked      = peer->clientIsChoked;
2623        stat->clientIsInterested  = peer->clientIsInterested;
2624        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2625        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2626        stat->isUploadingTo       = clientIsUploadingTo( peer );
2627        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2628
2629        stat->blocksToPeer        = tr_historyGet( peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2630        stat->blocksToClient      = tr_historyGet( peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2631        stat->cancelsToPeer       = tr_historyGet( peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2632        stat->cancelsToClient     = tr_historyGet( peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2633
2634        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2635        stat->pendingReqsToClient = peer->pendingReqsToClient;
2636
2637        pch = stat->flagStr;
2638        if( peer->io->utp_socket != NULL) *pch++ = 'T';
2639        if( t->optimistic == peer ) *pch++ = 'O';
2640        if( stat->isDownloadingFrom ) *pch++ = 'D';
2641        else if( stat->clientIsInterested ) *pch++ = 'd';
2642        if( stat->isUploadingTo ) *pch++ = 'U';
2643        else if( stat->peerIsInterested ) *pch++ = 'u';
2644        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2645        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2646        if( stat->isEncrypted ) *pch++ = 'E';
2647        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2648        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2649        if( stat->isIncoming ) *pch++ = 'I';
2650        *pch = '\0';
2651    }
2652
2653    *setmeCount = size;
2654
2655    return ret;
2656}
2657
2658/**
2659***
2660**/
2661
2662void
2663tr_peerMgrClearInterest( tr_torrent * tor )
2664{
2665    int i;
2666    Torrent * t = tor->torrentPeers;
2667    const int peerCount = tr_ptrArraySize( &t->peers );
2668
2669    assert( tr_isTorrent( tor ) );
2670    assert( tr_torrentIsLocked( tor ) );
2671
2672    for( i=0; i<peerCount; ++i )
2673    {
2674        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2675        tr_peerMsgsSetInterested( peer->msgs, FALSE );
2676    }
2677}
2678
2679/* do we still want this piece and does the peer have it? */
2680static tr_bool
2681isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2682{
2683    return ( !tor->info.pieces[index].dnd ) /* we want it */
2684        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2685        && ( tr_bitsetHas( &peer->have, index ) ); /* peer has it */
2686}
2687
2688/* does this peer have any pieces that we want? */
2689static tr_bool
2690isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2691{
2692    tr_piece_index_t i, n;
2693
2694    if ( tr_torrentIsSeed( tor ) )
2695        return FALSE;
2696
2697    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2698        return FALSE;
2699
2700    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2701        if( isPieceInteresting( tor, peer, i ) )
2702            return TRUE;
2703
2704    return FALSE;
2705}
2706
2707/* determines who we send "interested" messages to */
2708static void
2709rechokeDownloads( Torrent * t )
2710{
2711    int i;
2712    const time_t now = tr_time( );
2713    const int MIN_INTERESTING_PEERS = 5;
2714    const int peerCount = tr_ptrArraySize( &t->peers );
2715    int maxPeers;
2716
2717    int badCount         = 0;
2718    int goodCount        = 0;
2719    int untestedCount    = 0;
2720    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2721    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2722    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2723
2724    /* decide how many peers to be interested in */
2725    {
2726        int blocks = 0;
2727        int cancels = 0;
2728        time_t timeSinceCancel;
2729
2730        /* Count up how many blocks & cancels each peer has.
2731         *
2732         * There are two situations where we send out cancels --
2733         *
2734         * 1. We've got unresponsive peers, which is handled by deciding
2735         *    -which- peers to be interested in.
2736         *
2737         * 2. We've hit our bandwidth cap, which is handled by deciding
2738         *    -how many- peers to be interested in.
2739         *
2740         * We're working on 2. here, so we need to ignore unresponsive
2741         * peers in our calculations lest they confuse Transmission into
2742         * thinking it's hit its bandwidth cap.
2743         */
2744        for( i=0; i<peerCount; ++i )
2745        {
2746            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2747            const int b = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2748            const int c = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2749
2750            if( b == 0 ) /* ignore unresponsive peers, as described above */
2751                continue;
2752
2753            blocks += b;
2754            cancels += c;
2755        }
2756
2757        if( cancels > 0 )
2758        {
2759            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2760             * higher values indicate more congestion. */
2761            const double cancelRate = cancels / (double)(cancels + blocks);
2762            const double mult = 1 - MIN( cancelRate, 0.5 );
2763            maxPeers = t->interestedCount * mult;
2764            tordbg( t, "cancel rate is %.3f -- reducing the "
2765                       "number of peers we're interested in by %.0f percent",
2766                       cancelRate, mult * 100 );
2767            t->lastCancel = now;
2768        }
2769
2770        timeSinceCancel = now - t->lastCancel;
2771        if( timeSinceCancel )
2772        {
2773            const int maxIncrease = 15;
2774            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2775            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2776            const int inc = maxIncrease * mult;
2777            maxPeers = t->maxPeers + inc;
2778            tordbg( t, "time since last cancel is %li -- increasing the "
2779                       "number of peers we're interested in by %d",
2780                       timeSinceCancel, inc );
2781        }
2782    }
2783
2784    /* don't let the previous section's number tweaking go too far... */
2785    if( maxPeers < MIN_INTERESTING_PEERS )
2786        maxPeers = MIN_INTERESTING_PEERS;
2787    if( maxPeers > t->tor->maxConnectedPeers )
2788        maxPeers = t->tor->maxConnectedPeers;
2789
2790    t->maxPeers = maxPeers;
2791
2792    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2793     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2794     * That's the order in which we'll choose who to show interest in */
2795    {
2796        /* Randomize the peer array so the peers in the three groups will be unsorted... */
2797        int n = peerCount;
2798        tr_peer ** peers = tr_memdup( tr_ptrArrayBase( &t->peers ), n * sizeof( tr_peer * ) );
2799
2800        while( n > 0 )
2801        {
2802            const int i = tr_cryptoWeakRandInt( n );
2803            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2804
2805            if( !isPeerInteresting( t->tor, peer ) )
2806            {
2807                tr_peerMsgsSetInterested( peer->msgs, FALSE );
2808            }
2809            else
2810            {
2811                const int blocks = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2812                const int cancels = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2813
2814                if( !blocks && !cancels )
2815                    untested[untestedCount++] = peer;
2816                else if( !cancels )
2817                    good[goodCount++] = peer;
2818                else if( !blocks )
2819                    bad[badCount++] = peer;
2820                else if( ( cancels * 10 ) < blocks )
2821                    good[goodCount++] = peer;
2822                else
2823                    bad[badCount++] = peer;
2824            }
2825
2826            tr_removeElementFromArray( peers, i, sizeof(tr_peer*), n-- );
2827        }
2828
2829        tr_free( peers );
2830    }
2831
2832    t->interestedCount = 0;
2833
2834    /* We've decided (1) how many peers to be interested in,
2835     * and (2) which peers are the best candidates,
2836     * Now it's time to update our `interest' flags. */
2837    for( i=0; i<goodCount; ++i ) {
2838        const tr_bool b = t->interestedCount < maxPeers;
2839        tr_peerMsgsSetInterested( good[i]->msgs, b );
2840        if( b )
2841            ++t->interestedCount;
2842    }
2843    for( i=0; i<untestedCount; ++i ) {
2844        const tr_bool b = t->interestedCount < maxPeers;
2845        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2846        if( b )
2847            ++t->interestedCount;
2848    }
2849    for( i=0; i<badCount; ++i ) {
2850        const tr_bool b = t->interestedCount < maxPeers;
2851        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2852        if( b )
2853            ++t->interestedCount;
2854    }
2855
2856/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2857
2858    /* cleanup */
2859    tr_free( untested );
2860    tr_free( good );
2861    tr_free( bad );
2862}
2863
2864/**
2865***
2866**/
2867
2868struct ChokeData
2869{
2870    tr_bool         isInterested;
2871    tr_bool         wasChoked;
2872    tr_bool         isChoked;
2873    int             rate;
2874    int             salt;
2875    tr_peer *       peer;
2876};
2877
2878static int
2879compareChoke( const void * va,
2880              const void * vb )
2881{
2882    const struct ChokeData * a = va;
2883    const struct ChokeData * b = vb;
2884
2885    if( a->rate != b->rate ) /* prefer higher overall speeds */
2886        return a->rate > b->rate ? -1 : 1;
2887
2888    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2889        return a->wasChoked ? 1 : -1;
2890
2891    if( a->salt != b->salt ) /* random order */
2892        return a->salt - b->salt;
2893
2894    return 0;
2895}
2896
2897/* is this a new connection? */
2898static int
2899isNew( const tr_peer * peer )
2900{
2901    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2902}
2903
2904/* get a rate for deciding which peers to choke and unchoke. */
2905static int
2906getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
2907{
2908    int Bps;
2909
2910    if( tr_torrentIsSeed( tor ) )
2911        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2912
2913    /* downloading a private torrent... take upload speed into account
2914     * because there may only be a small window of opportunity to share */
2915    else if( tr_torrentIsPrivate( tor ) )
2916        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
2917            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2918
2919    /* downloading a public torrent */
2920    else
2921        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
2922
2923    /* convert it to bytes per second */
2924    return Bps;
2925}
2926
2927static inline tr_bool
2928isBandwidthMaxedOut( const tr_bandwidth * b,
2929                     const uint64_t now_msec, tr_direction dir )
2930{
2931    if( !tr_bandwidthIsLimited( b, dir ) )
2932        return FALSE;
2933    else {
2934        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
2935        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
2936        return got >= want;
2937    }
2938}
2939
2940static void
2941rechokeUploads( Torrent * t, const uint64_t now )
2942{
2943    int i, size, unchokedInterested;
2944    const int peerCount = tr_ptrArraySize( &t->peers );
2945    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2946    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2947    const tr_session * session = t->manager->session;
2948    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2949    const tr_bool isMaxedOut = isBandwidthMaxedOut( t->tor->bandwidth, now, TR_UP );
2950
2951    assert( torrentIsLocked( t ) );
2952
2953    /* an optimistic unchoke peer's "optimistic"
2954     * state lasts for N calls to rechokeUploads(). */
2955    if( t->optimisticUnchokeTimeScaler > 0 )
2956        t->optimisticUnchokeTimeScaler--;
2957    else
2958        t->optimistic = NULL;
2959
2960    /* sort the peers by preference and rate */
2961    for( i = 0, size = 0; i < peerCount; ++i )
2962    {
2963        tr_peer * peer = peers[i];
2964        struct peer_atom * atom = peer->atom;
2965
2966        if( peer->progress >= 1.0 ) /* choke all seeds */
2967        {
2968            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2969        }
2970        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2971        {
2972            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2973        }
2974        else if( chokeAll ) /* choke everyone if we're not uploading */
2975        {
2976            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2977        }
2978        else if( peer != t->optimistic )
2979        {
2980            struct ChokeData * n = &choke[size++];
2981            n->peer         = peer;
2982            n->isInterested = peer->peerIsInterested;
2983            n->wasChoked    = peer->peerIsChoked;
2984            n->rate         = getRate( t->tor, atom, now );
2985            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
2986            n->isChoked     = TRUE;
2987        }
2988    }
2989
2990    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2991
2992    /**
2993     * Reciprocation and number of uploads capping is managed by unchoking
2994     * the N peers which have the best upload rate and are interested.
2995     * This maximizes the client's download rate. These N peers are
2996     * referred to as downloaders, because they are interested in downloading
2997     * from the client.
2998     *
2999     * Peers which have a better upload rate (as compared to the downloaders)
3000     * but aren't interested get unchoked. If they become interested, the
3001     * downloader with the worst upload rate gets choked. If a client has
3002     * a complete file, it uses its upload rate rather than its download
3003     * rate to decide which peers to unchoke.
3004     *
3005     * If our bandwidth is maxed out, don't unchoke any more peers.
3006     */
3007    unchokedInterested = 0;
3008    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3009        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : FALSE;
3010        if( choke[i].isInterested )
3011            ++unchokedInterested;
3012    }
3013
3014    /* optimistic unchoke */
3015    if( !t->optimistic && !isMaxedOut && (i<size) )
3016    {
3017        int n;
3018        struct ChokeData * c;
3019        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3020
3021        for( ; i<size; ++i )
3022        {
3023            if( choke[i].isInterested )
3024            {
3025                const tr_peer * peer = choke[i].peer;
3026                int x = 1, y;
3027                if( isNew( peer ) ) x *= 3;
3028                for( y=0; y<x; ++y )
3029                    tr_ptrArrayAppend( &randPool, &choke[i] );
3030            }
3031        }
3032
3033        if(( n = tr_ptrArraySize( &randPool )))
3034        {
3035            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3036            c->isChoked = FALSE;
3037            t->optimistic = c->peer;
3038            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3039        }
3040
3041        tr_ptrArrayDestruct( &randPool, NULL );
3042    }
3043
3044    for( i=0; i<size; ++i )
3045        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3046
3047    /* cleanup */
3048    tr_free( choke );
3049}
3050
3051static void
3052rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3053{
3054    tr_torrent * tor = NULL;
3055    tr_peerMgr * mgr = vmgr;
3056    const uint64_t now = tr_sessionGetTimeMsec( mgr->session );
3057
3058    managerLock( mgr );
3059
3060    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3061        if( tor->isRunning ) {
3062            rechokeUploads( tor->torrentPeers, now );
3063            if( !tr_torrentIsSeed( tor ) )
3064                rechokeDownloads( tor->torrentPeers );
3065        }
3066    }
3067
3068    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3069    managerUnlock( mgr );
3070}
3071
3072/***
3073****
3074****  Life and Death
3075****
3076***/
3077
3078typedef enum
3079{
3080    TR_CAN_KEEP,
3081    TR_CAN_CLOSE,
3082    TR_MUST_CLOSE,
3083}
3084tr_close_type_t;
3085
3086static tr_close_type_t
3087shouldPeerBeClosed( const Torrent    * t,
3088                    const tr_peer    * peer,
3089                    int                peerCount,
3090                    const time_t       now )
3091{
3092    const tr_torrent *       tor = t->tor;
3093    const struct peer_atom * atom = peer->atom;
3094
3095    /* if it's marked for purging, close it */
3096    if( peer->doPurge )
3097    {
3098        tordbg( t, "purging peer %s because its doPurge flag is set",
3099                tr_atomAddrStr( atom ) );
3100        return TR_MUST_CLOSE;
3101    }
3102
3103    /* if we're seeding and the peer has everything we have,
3104     * and enough time has passed for a pex exchange, then disconnect */
3105    if( tr_torrentIsSeed( tor ) )
3106    {
3107        tr_bool peerHasEverything;
3108
3109        if( atom->seedProbability != -1 )
3110        {
3111            peerHasEverything = atomIsSeed( atom );
3112        }
3113        else
3114        {
3115            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
3116            tr_bitsetDifference( tmp, &peer->have );
3117            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
3118            tr_bitfieldFree( tmp );
3119        }
3120
3121        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
3122        {
3123            tordbg( t, "purging peer %s because we're both seeds",
3124                    tr_atomAddrStr( atom ) );
3125            return TR_MUST_CLOSE;
3126        }
3127    }
3128
3129    /* disconnect if it's been too long since piece data has been transferred.
3130     * this is on a sliding scale based on number of available peers... */
3131    {
3132        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3133        /* if we have >= relaxIfFewerThan, strictness is 100%.
3134         * if we have zero connections, strictness is 0% */
3135        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3136                               ? 1.0
3137                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3138        const int lo = MIN_UPLOAD_IDLE_SECS;
3139        const int hi = MAX_UPLOAD_IDLE_SECS;
3140        const int limit = hi - ( ( hi - lo ) * strictness );
3141        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3142/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3143        if( idleTime > limit ) {
3144            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3145                       tr_atomAddrStr( atom ), idleTime );
3146            return TR_CAN_CLOSE;
3147        }
3148    }
3149
3150    return TR_CAN_KEEP;
3151}
3152
3153static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
3154
3155static tr_peer **
3156getPeersToClose( Torrent * t, tr_close_type_t closeType,
3157                 const uint64_t now_msec, const time_t now_sec,
3158                 int * setmeSize )
3159{
3160    int i, peerCount, outsize;
3161    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3162    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
3163
3164    assert( torrentIsLocked( t ) );
3165
3166    for( i = outsize = 0; i < peerCount; ++i )
3167        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) == closeType )
3168            ret[outsize++] = peers[i];
3169
3170    sortPeersByLivelinessReverse ( ret, NULL, outsize, now_msec );
3171
3172    *setmeSize = outsize;
3173    return ret;
3174}
3175
3176static int
3177getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3178{
3179    int sec;
3180
3181    /* if we were recently connected to this peer and transferring piece
3182     * data, try to reconnect to them sooner rather that later -- we don't
3183     * want network troubles to get in the way of a good peer. */
3184    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3185        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3186
3187    /* don't allow reconnects more often than our minimum */
3188    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3189        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3190
3191    /* otherwise, the interval depends on how many times we've tried
3192     * and failed to connect to the peer */
3193    else switch( atom->numFails ) {
3194        case 0: sec = 0; break;
3195        case 1: sec = 5; break;
3196        case 2: sec = 2 * 60; break;
3197        case 3: sec = 15 * 60; break;
3198        case 4: sec = 30 * 60; break;
3199        case 5: sec = 60 * 60; break;
3200        default: sec = 120 * 60; break;
3201    }
3202
3203    /* penalize peers that were unreachable the last time we tried */
3204    if( atom->flags2 & MYFLAG_UNREACHABLE )
3205        sec += sec;
3206
3207    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3208    return sec;
3209}
3210
3211static void
3212removePeer( Torrent * t, tr_peer * peer )
3213{
3214    tr_peer * removed;
3215    struct peer_atom * atom = peer->atom;
3216
3217    assert( torrentIsLocked( t ) );
3218    assert( atom );
3219
3220    atom->time = tr_time( );
3221
3222    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3223
3224    if( replicationExists( t ) )
3225        tr_decrReplicationFromBitset( t, &peer->have );
3226
3227    assert( removed == peer );
3228    peerDestructor( t, removed );
3229}
3230
3231static void
3232closePeer( Torrent * t, tr_peer * peer )
3233{
3234    struct peer_atom * atom;
3235
3236    assert( t != NULL );
3237    assert( peer != NULL );
3238
3239    atom = peer->atom;
3240
3241    /* if we transferred piece data, then they might be good peers,
3242       so reset their `numFails' weight to zero. otherwise we connected
3243       to them fruitlessly, so mark it as another fail */
3244    if( atom->piece_data_time ) {
3245        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3246        atom->numFails = 0;
3247    } else {
3248        ++atom->numFails;
3249        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3250    }
3251
3252    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3253    removePeer( t, peer );
3254}
3255
3256static void
3257removeAllPeers( Torrent * t )
3258{
3259    while( !tr_ptrArrayEmpty( &t->peers ) )
3260        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3261}
3262
3263static void
3264closeBadPeers( Torrent * t, const uint64_t now_msec, const time_t now_sec )
3265{
3266    if( !t->isRunning )
3267    {
3268        removeAllPeers( t );
3269    }
3270    else
3271    {
3272        int i;
3273        int mustCloseCount;
3274        struct tr_peer ** mustClose;
3275
3276        /* disconnect the really bad peers */
3277        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now_msec, now_sec, &mustCloseCount );
3278        for( i=0; i<mustCloseCount; ++i )
3279            closePeer( t, mustClose[i] );
3280        tr_free( mustClose );
3281    }
3282}
3283
3284struct peer_liveliness
3285{
3286    tr_peer * peer;
3287    void * clientData;
3288    time_t pieceDataTime;
3289    time_t time;
3290    int speed;
3291    tr_bool doPurge;
3292};
3293
3294static int
3295comparePeerLiveliness( const void * va, const void * vb )
3296{
3297    const struct peer_liveliness * a = va;
3298    const struct peer_liveliness * b = vb;
3299
3300    if( a->doPurge != b->doPurge )
3301        return a->doPurge ? 1 : -1;
3302
3303    if( a->speed != b->speed ) /* faster goes first */
3304        return a->speed > b->speed ? -1 : 1;
3305
3306    /* the one to give us data more recently goes first */
3307    if( a->pieceDataTime != b->pieceDataTime )
3308        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3309
3310    /* the one we connected to most recently goes first */
3311    if( a->time != b->time )
3312        return a->time > b->time ? -1 : 1;
3313
3314    return 0;
3315}
3316
3317static int
3318comparePeerLivelinessReverse( const void * va, const void * vb )
3319{
3320    return -comparePeerLiveliness (va, vb);
3321}
3322
3323static void
3324sortPeersByLivelinessImpl( tr_peer  ** peers,
3325                           void     ** clientData,
3326                           int         n,
3327                           uint64_t    now,
3328                           int (*compare) ( const void *va, const void *vb ) )
3329{
3330    int i;
3331    struct peer_liveliness *lives, *l;
3332
3333    /* build a sortable array of peer + extra info */
3334    lives = l = tr_new0( struct peer_liveliness, n );
3335    for( i=0; i<n; ++i, ++l )
3336    {
3337        tr_peer * p = peers[i];
3338        l->peer = p;
3339        l->doPurge = p->doPurge;
3340        l->pieceDataTime = p->atom->piece_data_time;
3341        l->time = p->atom->time;
3342        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3343                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3344        if( clientData )
3345            l->clientData = clientData[i];
3346    }
3347
3348    /* sort 'em */
3349    assert( n == ( l - lives ) );
3350    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3351
3352    /* build the peer array */
3353    for( i=0, l=lives; i<n; ++i, ++l ) {
3354        peers[i] = l->peer;
3355        if( clientData )
3356            clientData[i] = l->clientData;
3357    }
3358    assert( n == ( l - lives ) );
3359
3360    /* cleanup */
3361    tr_free( lives );
3362}
3363
3364static void
3365sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3366{
3367    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3368}
3369
3370static void
3371sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3372{
3373    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
3374}
3375
3376
3377static void
3378enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3379{
3380    int n = tr_ptrArraySize( &t->peers );
3381    const int max = tr_torrentGetPeerLimit( t->tor );
3382    if( n > max )
3383    {
3384        void * base = tr_ptrArrayBase( &t->peers );
3385        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3386        sortPeersByLiveliness( peers, NULL, n, now );
3387        while( n > max )
3388            closePeer( t, peers[--n] );
3389        tr_free( peers );
3390    }
3391}
3392
3393static void
3394enforceSessionPeerLimit( tr_session * session, uint64_t now )
3395{
3396    int n = 0;
3397    tr_torrent * tor = NULL;
3398    const int max = tr_sessionGetPeerLimit( session );
3399
3400    /* count the total number of peers */
3401    while(( tor = tr_torrentNext( session, tor )))
3402        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3403
3404    /* if there are too many, prune out the worst */
3405    if( n > max )
3406    {
3407        tr_peer ** peers = tr_new( tr_peer*, n );
3408        Torrent ** torrents = tr_new( Torrent*, n );
3409
3410        /* populate the peer array */
3411        n = 0;
3412        tor = NULL;
3413        while(( tor = tr_torrentNext( session, tor ))) {
3414            int i;
3415            Torrent * t = tor->torrentPeers;
3416            const int tn = tr_ptrArraySize( &t->peers );
3417            for( i=0; i<tn; ++i, ++n ) {
3418                peers[n] = tr_ptrArrayNth( &t->peers, i );
3419                torrents[n] = t;
3420            }
3421        }
3422
3423        /* sort 'em */
3424        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3425
3426        /* cull out the crappiest */
3427        while( n-- > max )
3428            closePeer( torrents[n], peers[n] );
3429
3430        /* cleanup */
3431        tr_free( torrents );
3432        tr_free( peers );
3433    }
3434}
3435
3436static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3437
3438static void
3439reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3440{
3441    tr_torrent * tor;
3442    tr_peerMgr * mgr = vmgr;
3443    const time_t now_sec = tr_time( );
3444    const uint64_t now_msec = tr_sessionGetTimeMsec( mgr->session );
3445
3446    /**
3447    ***  enforce the per-session and per-torrent peer limits
3448    **/
3449
3450    /* if we're over the per-torrent peer limits, cull some peers */
3451    tor = NULL;
3452    while(( tor = tr_torrentNext( mgr->session, tor )))
3453        if( tor->isRunning )
3454            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3455
3456    /* if we're over the per-session peer limits, cull some peers */
3457    enforceSessionPeerLimit( mgr->session, now_msec );
3458
3459    /* remove crappy peers */
3460    tor = NULL;
3461    while(( tor = tr_torrentNext( mgr->session, tor )))
3462        closeBadPeers( tor->torrentPeers, now_msec, now_sec );
3463
3464    /* try to make new peer connections */
3465    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3466}
3467
3468/****
3469*****
3470*****  BANDWIDTH ALLOCATION
3471*****
3472****/
3473
3474static void
3475pumpAllPeers( tr_peerMgr * mgr )
3476{
3477    tr_torrent * tor = NULL;
3478
3479    while(( tor = tr_torrentNext( mgr->session, tor )))
3480    {
3481        int j;
3482        Torrent * t = tor->torrentPeers;
3483
3484        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3485        {
3486            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3487            tr_peerMsgsPulse( peer->msgs );
3488        }
3489    }
3490}
3491
3492static void
3493bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3494{
3495    tr_torrent * tor;
3496    tr_peerMgr * mgr = vmgr;
3497    managerLock( mgr );
3498
3499    /* FIXME: this next line probably isn't necessary... */
3500    pumpAllPeers( mgr );
3501
3502    /* allocate bandwidth to the peers */
3503    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3504    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3505
3506    /* possibly stop torrents that have seeded enough */
3507    tor = NULL;
3508    while(( tor = tr_torrentNext( mgr->session, tor )))
3509        tr_torrentCheckSeedLimit( tor );
3510
3511    /* run the completeness check for any torrents that need it */
3512    tor = NULL;
3513    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3514        if( tor->torrentPeers->needsCompletenessCheck ) {
3515            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3516            tr_torrentRecheckCompleteness( tor );
3517        }
3518    }
3519
3520    /* stop torrents that are ready to stop, but couldn't be stopped earlier
3521     * during the peer-io callback call chain */
3522    tor = NULL;
3523    while(( tor = tr_torrentNext( mgr->session, tor )))
3524        if( tor->isStopping )
3525            tr_torrentStop( tor );
3526
3527    reconnectPulse( 0, 0, mgr );
3528
3529    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3530    managerUnlock( mgr );
3531}
3532
3533/***
3534****
3535***/
3536
3537static int
3538compareAtomPtrsByAddress( const void * va, const void *vb )
3539{
3540    const struct peer_atom * a = * (const struct peer_atom**) va;
3541    const struct peer_atom * b = * (const struct peer_atom**) vb;
3542
3543    assert( tr_isAtom( a ) );
3544    assert( tr_isAtom( b ) );
3545
3546    return tr_compareAddresses( &a->addr, &b->addr );
3547}
3548
3549/* best come first, worst go last */
3550static int
3551compareAtomPtrsByShelfDate( const void * va, const void *vb )
3552{
3553    time_t atime;
3554    time_t btime;
3555    const struct peer_atom * a = * (const struct peer_atom**) va;
3556    const struct peer_atom * b = * (const struct peer_atom**) vb;
3557    const int data_time_cutoff_secs = 60 * 60;
3558    const time_t tr_now = tr_time( );
3559
3560    assert( tr_isAtom( a ) );
3561    assert( tr_isAtom( b ) );
3562
3563    /* primary key: the last piece data time *if* it was within the last hour */
3564    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3565    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3566    if( atime != btime )
3567        return atime > btime ? -1 : 1;
3568
3569    /* secondary key: shelf date. */
3570    if( a->shelf_date != b->shelf_date )
3571        return a->shelf_date > b->shelf_date ? -1 : 1;
3572
3573    return 0;
3574}
3575
3576static int
3577getMaxAtomCount( const tr_torrent * tor )
3578{
3579    const int n = tor->maxConnectedPeers;
3580    /* approximate fit of the old jump discontinuous function */
3581    if( n >= 55 ) return     n + 150;
3582    if( n >= 20 ) return 2 * n + 95;
3583    return               4 * n + 55;
3584}
3585
3586static void
3587atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3588{
3589    tr_torrent * tor = NULL;
3590    tr_peerMgr * mgr = vmgr;
3591    managerLock( mgr );
3592
3593    while(( tor = tr_torrentNext( mgr->session, tor )))
3594    {
3595        int atomCount;
3596        Torrent * t = tor->torrentPeers;
3597        const int maxAtomCount = getMaxAtomCount( tor );
3598        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3599
3600        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3601        {
3602            int i;
3603            int keepCount = 0;
3604            int testCount = 0;
3605            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3606            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3607
3608            /* keep the ones that are in use */
3609            for( i=0; i<atomCount; ++i ) {
3610                struct peer_atom * atom = atoms[i];
3611                if( peerIsInUse( t, atom ) )
3612                    keep[keepCount++] = atom;
3613                else
3614                    test[testCount++] = atom;
3615            }
3616
3617            /* if there's room, keep the best of what's left */
3618            i = 0;
3619            if( keepCount < maxAtomCount ) {
3620                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3621                while( i<testCount && keepCount<maxAtomCount )
3622                    keep[keepCount++] = test[i++];
3623            }
3624
3625            /* free the culled atoms */
3626            while( i<testCount )
3627                tr_free( test[i++] );
3628
3629            /* rebuild Torrent.pool with what's left */
3630            tr_ptrArrayDestruct( &t->pool, NULL );
3631            t->pool = TR_PTR_ARRAY_INIT;
3632            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3633            for( i=0; i<keepCount; ++i )
3634                tr_ptrArrayAppend( &t->pool, keep[i] );
3635
3636            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3637
3638            /* cleanup */
3639            tr_free( test );
3640            tr_free( keep );
3641        }
3642    }
3643
3644    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3645    managerUnlock( mgr );
3646}
3647
3648/***
3649****
3650****
3651****
3652***/
3653
3654/* is this atom someone that we'd want to initiate a connection to? */
3655static tr_bool
3656isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3657{
3658    /* not if we're both seeds */
3659    if( tr_torrentIsSeed( tor ) )
3660        if( atomIsSeed( atom ) || ( atom->uploadOnly == UPLOAD_ONLY_YES ) )
3661            return FALSE;
3662
3663    /* not if we've already got a connection to them... */
3664    if( peerIsInUse( tor->torrentPeers, atom ) )
3665        return FALSE;
3666
3667    /* not if we just tried them already */
3668    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3669        return FALSE;
3670
3671    /* not if they're blocklisted */
3672    if( isAtomBlocklisted( tor->session, atom ) )
3673        return FALSE;
3674
3675    /* not if they're banned... */
3676    if( atom->flags2 & MYFLAG_BANNED )
3677        return FALSE;
3678
3679    return TRUE;
3680}
3681
3682struct peer_candidate
3683{
3684    uint64_t score;
3685    tr_torrent * tor;
3686    struct peer_atom * atom;
3687};
3688
3689static tr_bool
3690torrentWasRecentlyStarted( const tr_torrent * tor )
3691{
3692    return difftime( tr_time( ), tor->startDate ) < 120;
3693}
3694
3695static inline uint64_t
3696addValToKey( uint64_t value, int width, uint64_t addme )
3697{
3698    value = (value << (uint64_t)width);
3699    value |= addme;
3700    return value;
3701}
3702
3703/* smaller value is better */
3704static uint64_t
3705getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3706{
3707    uint64_t i;
3708    uint64_t score = 0;
3709    const tr_bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3710
3711    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3712    i = failed ? 1 : 0;
3713    score = addValToKey( score, 1, i );
3714
3715    /* prefer the one we attempted least recently (to cycle through all peers) */
3716    i = atom->lastConnectionAttemptAt;
3717    score = addValToKey( score, 32, i );
3718
3719    /* prefer peers belonging to a torrent of a higher priority */
3720    switch( tr_torrentGetPriority( tor ) ) {
3721        case TR_PRI_HIGH:    i = 0; break;
3722        case TR_PRI_NORMAL:  i = 1; break;
3723        case TR_PRI_LOW:     i = 2; break;
3724    }
3725    score = addValToKey( score, 4, i );
3726
3727    /* prefer recently-started torrents */
3728    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3729    score = addValToKey( score, 1, i );
3730
3731    /* prefer torrents we're downloading with */
3732    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3733    score = addValToKey( score, 1, i );
3734
3735    /* prefer peers that are known to be connectible */
3736    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3737    score = addValToKey( score, 1, i );
3738
3739    /* prefer peers that we might have a chance of uploading to...
3740       so lower seed probability is better */
3741    if( atom->seedProbability == 100 ) i = 101;
3742    else if( atom->seedProbability == -1 ) i = 100;
3743    else i = atom->seedProbability;
3744    score = addValToKey( score, 8, i );
3745
3746    /* Prefer peers that we got from more trusted sources.
3747     * lower `fromBest' values indicate more trusted sources */
3748    score = addValToKey( score, 4, atom->fromBest );
3749
3750    /* salt */
3751    score = addValToKey( score, 8, salt );
3752
3753    return score;
3754}
3755
3756/* sort an array of peer candidates */
3757static int
3758comparePeerCandidates( const void * va, const void * vb )
3759{
3760    const struct peer_candidate * a = va;
3761    const struct peer_candidate * b = vb;
3762
3763    if( a->score < b->score ) return -1;
3764    if( a->score > b->score ) return 1;
3765
3766    return 0;
3767}
3768
3769/** @return an array of all the atoms we might want to connect to */
3770static struct peer_candidate*
3771getPeerCandidates( tr_session * session, int * candidateCount )
3772{
3773    int n;
3774    tr_torrent * tor;
3775    struct peer_candidate * candidates;
3776    struct peer_candidate * walk;
3777    const time_t now = tr_time( );
3778    const uint64_t now_msec = tr_time_msec( );
3779    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3780    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3781
3782    /* don't start any new handshakes if we're full up */
3783    n = 0;
3784    tor= NULL;
3785    while(( tor = tr_torrentNext( session, tor )))
3786        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3787    if( maxCandidates <= n ) {
3788        *candidateCount = 0;
3789        return NULL;
3790    }
3791
3792    /* allocate an array of candidates */
3793    n = 0;
3794    tor= NULL;
3795    while(( tor = tr_torrentNext( session, tor )))
3796        n += tr_ptrArraySize( &tor->torrentPeers->pool );
3797    walk = candidates = tr_new( struct peer_candidate, n );
3798
3799    /* populate the candidate array */
3800    tor = NULL;
3801    while(( tor = tr_torrentNext( session, tor )))
3802    {
3803        int i, nAtoms;
3804        struct peer_atom ** atoms;
3805
3806        if( !tor->torrentPeers->isRunning )
3807            continue;
3808
3809        /* if we've already got enough peers in this torrent... */
3810        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3811            continue;
3812
3813        /* if we've already got enough speed in this torrent... */
3814        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( tor->bandwidth, now_msec, TR_UP ) )
3815            continue;
3816
3817        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3818        for( i=0; i<nAtoms; ++i )
3819        {
3820            struct peer_atom * atom = atoms[i];
3821
3822            if( isPeerCandidate( tor, atom, now ) )
3823            {
3824                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3825                walk->tor = tor;
3826                walk->atom = atom;
3827                walk->score = getPeerCandidateScore( tor, atom, salt );
3828                ++walk;
3829            }
3830        }
3831    }
3832
3833    *candidateCount = walk - candidates;
3834    if( *candidateCount > 1 )
3835        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3836    return candidates;
3837}
3838
3839static void
3840initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3841{
3842    tr_peerIo * io;
3843    const time_t now = tr_time( );
3844    tr_bool utp = FALSE;
3845
3846    /* Eventually, we'll want this to do something smart, like trying µTP
3847       first and eventually falling back to TCP.  For now, only open µTP
3848       connections if we have good reasons to believe the peer knows about
3849       ÂµTP. */
3850
3851    utp =
3852        tr_sessionIsUTPEnabled(mgr->session) &&
3853        (atom->flags & ADDED_F_UTP_FLAGS);
3854
3855    tordbg( t, "Starting an OUTGOING%s connection with %s",
3856            utp ? " µTP" : "",
3857            tr_atomAddrStr( atom ) );
3858
3859    io = tr_peerIoNewOutgoing( mgr->session,
3860                               mgr->session->bandwidth,
3861                               &atom->addr,
3862                               atom->port,
3863                               t->tor->info.hash,
3864                               t->tor->completeness == TR_SEED,
3865                               utp );
3866
3867    if( io == NULL )
3868    {
3869        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3870                tr_atomAddrStr( atom ) );
3871        atom->flags2 |= MYFLAG_UNREACHABLE;
3872        atom->numFails++;
3873    }
3874    else
3875    {
3876        tr_handshake * handshake = tr_handshakeNew( io,
3877                                                    mgr->session->encryptionMode,
3878                                                    myHandshakeDoneCB,
3879                                                    mgr );
3880
3881        assert( tr_peerIoGetTorrentHash( io ) );
3882
3883        tr_peerIoUnref( io ); /* balanced by the initial ref
3884                                 in tr_peerIoNewOutgoing() */
3885
3886        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3887                                 handshakeCompare );
3888    }
3889
3890    atom->lastConnectionAttemptAt = now;
3891    atom->time = now;
3892}
3893
3894static void
3895initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3896{
3897#if 0
3898    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3899             tr_atomAddrStr( c->atom ),
3900             tr_torrentName( c->tor ),
3901             (int)c->atom->seedProbability,
3902             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3903             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3904#endif
3905
3906    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3907}
3908
3909static void
3910makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3911{
3912    int i, n;
3913    struct peer_candidate * candidates;
3914
3915    candidates = getPeerCandidates( mgr->session, &n );
3916
3917    for( i=0; i<n && i<max; ++i )
3918        initiateCandidateConnection( mgr, &candidates[i] );
3919
3920    tr_free( candidates );
3921}
Note: See TracBrowser for help on using the repository browser.