source: trunk/libtransmission/peer-mgr.c @ 9651

Last change on this file since 9651 was 9651, checked in by charles, 12 years ago

(trunk libT) experimental: Reduce SO_SNDBUF and SO_RCVBUF for tracker announce/scrape messages. Reduce SO_RCVBUF for outgoing peer connections on seeding torrents.

  • Property svn:keywords set to Date Rev Author Id
File size: 87.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 9651 2009-12-02 05:30:46Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "announcer.h"
23#include "bandwidth.h"
24#include "bencode.h"
25#include "blocklist.h"
26#include "clients.h"
27#include "completion.h"
28#include "crypto.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to cull old atoms */
46    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
47
48    /* how frequently to change which peers are choked */
49    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
50
51    /* minimum interval for refilling peers' request lists */
52    REFILL_PERIOD_MSEC = 400,
53
54    /* how frequently to reallocate bandwidth */
55    BANDWIDTH_PERIOD_MSEC = 500,
56
57    /* how frequently to age out old piece request lists */
58    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
59
60    /* how frequently to decide which peers live and die */
61    RECONNECT_PERIOD_MSEC = 500,
62
63    /* when many peers are available, keep idle ones this long */
64    MIN_UPLOAD_IDLE_SECS = ( 60 ),
65
66    /* when few peers are available, keep idle ones this long */
67    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
68
69    /* max # of peers to ask fer per torrent per reconnect pulse */
70    MAX_RECONNECTIONS_PER_PULSE = 8,
71
72    /* max number of peers to ask for per second overall.
73    * this throttle is to avoid overloading the router */
74    MAX_CONNECTIONS_PER_SECOND = 16,
75
76    /* number of bad pieces a peer is allowed to send before we ban them */
77    MAX_BAD_PIECES_PER_PEER = 5,
78
79    /* amount of time to keep a list of request pieces lying around
80       before it's considered too old and needs to be rebuilt */
81    PIECE_LIST_SHELF_LIFE_SECS = 60,
82
83    /* use for bitwise operations w/peer_atom.myflags */
84    MYFLAG_BANNED = 1,
85
86    /* use for bitwise operations w/peer_atom.myflags */
87    /* unreachable for now... but not banned.
88     * if they try to connect to us it's okay */
89    MYFLAG_UNREACHABLE = 2,
90
91    /* the minimum we'll wait before attempting to reconnect to a peer */
92    MINIMUM_RECONNECT_INTERVAL_SECS = 5
93};
94
95
96/**
97***
98**/
99
100enum
101{
102    UPLOAD_ONLY_UKNOWN,
103    UPLOAD_ONLY_YES,
104    UPLOAD_ONLY_NO
105};
106
107/**
108 * Peer information that should be kept even before we've connected and
109 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
110 * which ones would make good candidates for connecting to, and to watch out
111 * for banned peers.
112 *
113 * @see tr_peer
114 * @see tr_peermsgs
115 */
116struct peer_atom
117{
118    tr_peer   * peer;        /* will be NULL if not connected */
119    uint8_t     from;
120    uint8_t     flags;       /* these match the added_f flags */
121    uint8_t     myflags;     /* flags that aren't defined in added_f */
122    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
123    tr_port     port;
124    uint16_t    numFails;
125    tr_address  addr;
126    time_t      time;        /* when the peer's connection status last changed */
127    time_t      piece_data_time;
128
129    /* similar to a TTL field, but less rigid --
130     * if the swarm is small, the atom will be kept past this date. */
131    time_t      shelf_date;
132};
133
134static tr_bool
135tr_isAtom( const struct peer_atom * atom )
136{
137    return ( atom != NULL )
138        && ( atom->from < TR_PEER_FROM__MAX )
139        && ( tr_isAddress( &atom->addr ) );
140}
141
142static const char*
143tr_atomAddrStr( const struct peer_atom * atom )
144{
145    return tr_peerIoAddrStr( &atom->addr, atom->port );
146}
147
148struct block_request
149{
150    tr_block_index_t block;
151    tr_peer * peer;
152    time_t sentAt;
153};
154
155struct weighted_piece
156{
157    tr_piece_index_t index;
158    int16_t salt;
159    int16_t requestCount;
160};
161
162typedef struct tr_torrent_peers
163{
164    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
165    tr_ptrArray                pool; /* struct peer_atom */
166    tr_ptrArray                peers; /* tr_peer */
167    tr_ptrArray                webseeds; /* tr_webseed */
168
169    tr_torrent               * tor;
170    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
171    struct tr_peerMgr        * manager;
172
173    tr_bool                    isRunning;
174    tr_bool                    needsCompletenessCheck;
175
176    struct block_request     * requests;
177    int                        requestsSort;
178    int                        requestCount;
179    int                        requestAlloc;
180
181    struct weighted_piece    * pieces;
182    int                        piecesSort;
183    int                        pieceCount;
184
185    tr_bool                    isInEndgame;
186}
187Torrent;
188
189struct tr_peerMgr
190{
191    tr_session      * session;
192    tr_ptrArray       incomingHandshakes; /* tr_handshake */
193    tr_timer        * bandwidthTimer;
194    tr_timer        * rechokeTimer;
195    tr_timer        * reconnectTimer;
196    tr_timer        * refillUpkeepTimer;
197    tr_timer        * atomTimer;
198};
199
200#define tordbg( t, ... ) \
201    do { \
202        if( tr_deepLoggingIsActive( ) ) \
203            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
204    } while( 0 )
205
206#define dbgmsg( ... ) \
207    do { \
208        if( tr_deepLoggingIsActive( ) ) \
209            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
210    } while( 0 )
211
212/**
213***
214**/
215
216static TR_INLINE void
217managerLock( const struct tr_peerMgr * manager )
218{
219    tr_globalLock( manager->session );
220}
221
222static TR_INLINE void
223managerUnlock( const struct tr_peerMgr * manager )
224{
225    tr_globalUnlock( manager->session );
226}
227
228static TR_INLINE void
229torrentLock( Torrent * torrent )
230{
231    managerLock( torrent->manager );
232}
233
234static TR_INLINE void
235torrentUnlock( Torrent * torrent )
236{
237    managerUnlock( torrent->manager );
238}
239
240static TR_INLINE int
241torrentIsLocked( const Torrent * t )
242{
243    return tr_globalIsLocked( t->manager->session );
244}
245
246/**
247***
248**/
249
250static int
251handshakeCompareToAddr( const void * va, const void * vb )
252{
253    const tr_handshake * a = va;
254
255    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
256}
257
258static int
259handshakeCompare( const void * a, const void * b )
260{
261    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
262}
263
264static tr_handshake*
265getExistingHandshake( tr_ptrArray      * handshakes,
266                      const tr_address * addr )
267{
268    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
269}
270
271static int
272comparePeerAtomToAddress( const void * va, const void * vb )
273{
274    const struct peer_atom * a = va;
275
276    return tr_compareAddresses( &a->addr, vb );
277}
278
279static int
280compareAtomsByAddress( const void * va, const void * vb )
281{
282    const struct peer_atom * b = vb;
283
284    assert( tr_isAtom( b ) );
285
286    return comparePeerAtomToAddress( va, &b->addr );
287}
288
289/**
290***
291**/
292
293const tr_address *
294tr_peerAddress( const tr_peer * peer )
295{
296    return &peer->atom->addr;
297}
298
299static Torrent*
300getExistingTorrent( tr_peerMgr *    manager,
301                    const uint8_t * hash )
302{
303    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
304
305    return tor == NULL ? NULL : tor->torrentPeers;
306}
307
308static int
309peerCompare( const void * a, const void * b )
310{
311    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
312}
313
314static struct peer_atom*
315getExistingAtom( const Torrent    * t,
316                 const tr_address * addr )
317{
318    Torrent * tt = (Torrent*)t;
319    assert( torrentIsLocked( t ) );
320    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
321}
322
323static tr_bool
324peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
325{
326    Torrent * t = (Torrent*) ct;
327
328    assert( torrentIsLocked ( t ) );
329
330    return ( atom->peer != NULL )
331        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
332        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
333}
334
335static tr_peer*
336peerConstructor( struct peer_atom * atom )
337{
338    tr_peer * peer = tr_new0( tr_peer, 1 );
339    tr_bitsetConstructor( &peer->have, 0 );
340    peer->atom = atom;
341    atom->peer = peer;
342    return peer;
343}
344
345static tr_peer*
346getPeer( Torrent * torrent, struct peer_atom * atom )
347{
348    tr_peer * peer;
349
350    assert( torrentIsLocked( torrent ) );
351
352    peer = atom->peer;
353
354    if( peer == NULL )
355    {
356        peer = peerConstructor( atom );
357        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
358    }
359
360    return peer;
361}
362
363static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
364
365static void
366peerDestructor( Torrent * t, tr_peer * peer )
367{
368    assert( peer != NULL );
369
370    peerDeclinedAllRequests( t, peer );
371
372    if( peer->msgs != NULL )
373    {
374        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
375        tr_peerMsgsFree( peer->msgs );
376    }
377
378    tr_peerIoClear( peer->io );
379    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
380
381    tr_bitsetDestructor( &peer->have );
382    tr_bitfieldFree( peer->blame );
383    tr_free( peer->client );
384    peer->atom->peer = NULL;
385
386    tr_free( peer );
387}
388
389static void
390removePeer( Torrent * t, tr_peer * peer )
391{
392    tr_peer * removed;
393    struct peer_atom * atom = peer->atom;
394
395    assert( torrentIsLocked( t ) );
396    assert( atom );
397
398    atom->time = tr_time( );
399
400    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
401    assert( removed == peer );
402    peerDestructor( t, removed );
403}
404
405static void
406removeAllPeers( Torrent * t )
407{
408    while( !tr_ptrArrayEmpty( &t->peers ) )
409        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
410}
411
412static void
413torrentDestructor( void * vt )
414{
415    Torrent * t = vt;
416
417    assert( t );
418    assert( !t->isRunning );
419    assert( torrentIsLocked( t ) );
420    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
421    assert( tr_ptrArrayEmpty( &t->peers ) );
422
423    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
424    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
425    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
426    tr_ptrArrayDestruct( &t->peers, NULL );
427
428    tr_free( t->requests );
429    tr_free( t->pieces );
430    tr_free( t );
431}
432
433static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
434
435static Torrent*
436torrentConstructor( tr_peerMgr * manager,
437                    tr_torrent * tor )
438{
439    int       i;
440    Torrent * t;
441
442    t = tr_new0( Torrent, 1 );
443    t->manager = manager;
444    t->tor = tor;
445    t->pool = TR_PTR_ARRAY_INIT;
446    t->peers = TR_PTR_ARRAY_INIT;
447    t->webseeds = TR_PTR_ARRAY_INIT;
448    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
449    t->requests = 0;
450
451    for( i = 0; i < tor->info.webseedCount; ++i )
452    {
453        tr_webseed * w =
454            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
455        tr_ptrArrayAppend( &t->webseeds, w );
456    }
457
458    return t;
459}
460
461tr_peerMgr*
462tr_peerMgrNew( tr_session * session )
463{
464    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
465    m->session = session;
466    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
467    return m;
468}
469
470static void
471deleteTimers( struct tr_peerMgr * m )
472{
473    if( m->atomTimer )
474        tr_timerFree( &m->atomTimer );
475
476    if( m->bandwidthTimer )
477        tr_timerFree( &m->bandwidthTimer );
478
479    if( m->rechokeTimer )
480        tr_timerFree( &m->rechokeTimer );
481
482    if( m->reconnectTimer )
483        tr_timerFree( &m->reconnectTimer );
484
485    if( m->refillUpkeepTimer )
486        tr_timerFree( &m->refillUpkeepTimer );
487}
488
489void
490tr_peerMgrFree( tr_peerMgr * manager )
491{
492    managerLock( manager );
493
494    deleteTimers( manager );
495
496    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
497     * the item from manager->handshakes, so this is a little roundabout... */
498    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
499        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
500
501    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
502
503    managerUnlock( manager );
504    tr_free( manager );
505}
506
507static int
508clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
509{
510    if( !tr_torrentHasMetadata( tor ) )
511        return TRUE;
512
513    return peer->clientIsInterested && !peer->clientIsChoked;
514}
515
516static int
517clientIsUploadingTo( const tr_peer * peer )
518{
519    return peer->peerIsInterested && !peer->peerIsChoked;
520}
521
522/***
523****
524***/
525
526tr_bool
527tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
528                      const tr_address  * addr )
529{
530    tr_bool isSeed = FALSE;
531    const Torrent * t = tor->torrentPeers;
532    const struct peer_atom * atom = getExistingAtom( t, addr );
533
534    if( atom )
535        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
536
537    return isSeed;
538}
539
540/**
541***  REQUESTS
542***
543*** There are two data structures associated with managing block requests:
544***
545*** 1. Torrent::requests, an array of "struct block_request" which keeps
546***    track of which blocks have been requested, and when, and by which peers.
547***    This is list is used for (a) cancelling requests that have been pending
548***    for too long and (b) avoiding duplicate requests before endgame.
549***
550*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
551***    pieces that we want to request.  It's used to decide which pieces to
552***    return next when tr_peerMgrGetBlockRequests() is called.
553**/
554
555/**
556*** struct block_request
557**/
558
559enum
560{
561    REQ_UNSORTED,
562    REQ_SORTED_BY_BLOCK,
563    REQ_SORTED_BY_TIME
564};
565
566static int
567compareReqByBlock( const void * va, const void * vb )
568{
569    const struct block_request * a = va;
570    const struct block_request * b = vb;
571    if( a->block < b->block ) return -1;
572    if( a->block > b->block ) return 1;
573    return 0;
574}
575
576static int
577compareReqByTime( const void * va, const void * vb )
578{
579    const struct block_request * a = va;
580    const struct block_request * b = vb;
581    if( a->sentAt < b->sentAt ) return -1;
582    if( a->sentAt > b->sentAt ) return 1;
583    return 0;
584}
585
586static void
587requestListSort( Torrent * t, int mode )
588{
589    assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME );
590
591    if( t->requestsSort != mode )
592    {
593        int(*compar)(const void *, const void *);
594
595        t->requestsSort = mode;
596
597        switch( mode ) {
598            case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break;
599            case REQ_SORTED_BY_TIME: compar = compareReqByTime; break;
600            default: assert( 0 && "unhandled" );
601        }
602
603        qsort( t->requests, t->requestCount,
604               sizeof( struct block_request ), compar );
605    }
606}
607
608static void
609requestListAdd( Torrent * t, const time_t now, tr_block_index_t block, tr_peer * peer )
610{
611    struct block_request key;
612
613    /* ensure enough room is available... */
614    if( t->requestCount + 1 >= t->requestAlloc )
615    {
616        const int CHUNK_SIZE = 128;
617        t->requestAlloc += CHUNK_SIZE;
618        t->requests = tr_renew( struct block_request,
619                                t->requests, t->requestAlloc );
620    }
621
622    /* populate the record we're inserting */
623    key.block = block;
624    key.peer = peer;
625    key.sentAt = now;
626
627    /* insert the request to our array... */
628    switch( t->requestsSort )
629    {
630        case REQ_UNSORTED:
631        case REQ_SORTED_BY_TIME:
632            t->requests[t->requestCount++] = key;
633            break;
634
635        case REQ_SORTED_BY_BLOCK: {
636            tr_bool exact;
637            const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
638                                           sizeof( struct block_request ),
639                                           compareReqByBlock, &exact );
640            assert( !exact );
641            memmove( t->requests + pos + 1,
642                     t->requests + pos,
643                     sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
644            t->requests[pos] = key;
645            break;
646        }
647    }
648}
649
650static struct block_request *
651requestListLookup( Torrent * t, tr_block_index_t block )
652{
653    struct block_request key;
654    key.block = block;
655
656    requestListSort( t, REQ_SORTED_BY_BLOCK );
657
658    return bsearch( &key, t->requests, t->requestCount,
659                    sizeof( struct block_request ),
660                    compareReqByBlock );
661}
662
663static void
664requestListRemove( Torrent * t, tr_block_index_t block )
665{
666    const struct block_request * b = requestListLookup( t, block );
667    if( b != NULL )
668    {
669        const int pos = b - t->requests;
670        assert( pos < t->requestCount );
671        memmove( t->requests + pos,
672                 t->requests + pos + 1,
673                 sizeof( struct block_request ) * ( --t->requestCount - pos ) );
674    }
675}
676
677/**
678*** struct weighted_piece
679**/
680
681enum
682{
683    PIECES_UNSORTED,
684    PIECES_SORTED_BY_INDEX,
685    PIECES_SORTED_BY_WEIGHT
686};
687
688const tr_torrent * weightTorrent;
689
690/* we try to create a "weight" s.t. high-priority pieces come before others,
691 * and that partially-complete pieces come before empty ones. */
692static int
693comparePieceByWeight( const void * va, const void * vb )
694{
695    const struct weighted_piece * a = va;
696    const struct weighted_piece * b = vb;
697    int ia, ib, missing, pending;
698    const tr_torrent * tor = weightTorrent;
699
700    /* primary key: weight */
701    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
702    pending = a->requestCount;
703    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
704    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
705    pending = b->requestCount;
706    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
707    if( ia < ib ) return -1;
708    if( ia > ib ) return 1;
709
710    /* secondary key: higher priorities go first */
711    ia = tor->info.pieces[a->index].priority;
712    ib = tor->info.pieces[b->index].priority;
713    if( ia > ib ) return -1;
714    if( ia < ib ) return 1;
715
716    /* tertiary key: random */
717    return a->salt - b->salt;
718}
719
720static int
721comparePieceByIndex( const void * va, const void * vb )
722{
723    const struct weighted_piece * a = va;
724    const struct weighted_piece * b = vb;
725    if( a->index < b->index ) return -1;
726    if( a->index > b->index ) return 1;
727    return 0;
728}
729
730static void
731pieceListSort( Torrent * t, int mode )
732{
733    int(*compar)(const void *, const void *);
734
735    assert( mode==PIECES_SORTED_BY_INDEX
736         || mode==PIECES_SORTED_BY_WEIGHT );
737
738    if( t->piecesSort != mode )
739    {
740        t->piecesSort = mode;
741
742        switch( mode ) {
743            case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
744            case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
745            default: assert( 0 && "unhandled" );  break;
746        }
747
748        weightTorrent = t->tor;
749        qsort( t->pieces, t->pieceCount,
750               sizeof( struct weighted_piece ), compar );
751    }
752
753    /* Also, as long as we've got the pieces sorted by weight,
754     * let's also update t.isInEndgame */
755    if( t->piecesSort == PIECES_SORTED_BY_WEIGHT )
756    {
757        tr_bool endgame = TRUE;
758
759        if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
760        {
761            const tr_completion * cp = &t->tor->completion;
762            const struct weighted_piece * p = t->pieces;
763            const int pending = p->requestCount;
764            const int missing = tr_cpMissingBlocksInPiece( cp, p->index );
765            endgame = pending >= missing;
766        }
767
768        t->isInEndgame = endgame;
769    }
770}
771
772static struct weighted_piece *
773pieceListLookup( Torrent * t, tr_piece_index_t index )
774{
775    struct weighted_piece key;
776    key.index = index;
777
778    pieceListSort( t, PIECES_SORTED_BY_INDEX );
779
780    return bsearch( &key, t->pieces, t->pieceCount,
781                    sizeof( struct weighted_piece ),
782                    comparePieceByIndex );
783}
784
785static void
786pieceListRebuild( Torrent * t )
787{
788    if( !tr_torrentIsSeed( t->tor ) )
789    {
790        tr_piece_index_t i;
791        tr_piece_index_t * pool;
792        tr_piece_index_t poolCount = 0;
793        const tr_torrent * tor = t->tor;
794        const tr_info * inf = tr_torrentInfo( tor );
795        struct weighted_piece * pieces;
796        int pieceCount;
797
798        /* build the new list */
799        pool = tr_new( tr_piece_index_t, inf->pieceCount );
800        for( i=0; i<inf->pieceCount; ++i )
801            if( !inf->pieces[i].dnd )
802                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
803                    pool[poolCount++] = i;
804        pieceCount = poolCount;
805        pieces = tr_new0( struct weighted_piece, pieceCount );
806        for( i=0; i<poolCount; ++i ) {
807            struct weighted_piece * piece = pieces + i;
808            piece->index = pool[i];
809            piece->requestCount = 0;
810            piece->salt = tr_cryptoWeakRandInt( 255 );
811        }
812
813        /* if we already had a list of pieces, merge it into
814         * the new list so we don't lose its requestCounts */
815        if( t->pieces != NULL )
816        {
817            struct weighted_piece * o = t->pieces;
818            struct weighted_piece * oend = o + t->pieceCount;
819            struct weighted_piece * n = pieces;
820            struct weighted_piece * nend = n + pieceCount;
821
822            pieceListSort( t, PIECES_SORTED_BY_INDEX );
823
824            while( o!=oend && n!=nend ) {
825                if( o->index < n->index )
826                    ++o;
827                else if( o->index > n->index )
828                    ++n;
829                else
830                    *n++ = *o++;
831            }
832
833            tr_free( t->pieces );
834        }
835
836        t->pieces = pieces;
837        t->pieceCount = pieceCount;
838        t->piecesSort = PIECES_SORTED_BY_INDEX;
839
840        /* cleanup */
841        tr_free( pool );
842    }
843}
844
845static void
846pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
847{
848    struct weighted_piece * p = pieceListLookup( t, piece );
849
850    if( p != NULL )
851    {
852        const int pos = p - t->pieces;
853
854        memmove( t->pieces + pos,
855                 t->pieces + pos + 1,
856                 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
857
858        if( t->pieceCount == 0 )
859        {
860            tr_free( t->pieces );
861            t->pieces = NULL;
862        }
863    }
864}
865
866static void
867pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
868{
869    struct weighted_piece * p;
870    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
871
872    if(( p = pieceListLookup( t, index )))
873        if( p->requestCount > 0 )
874            --p->requestCount;
875
876    /* note: this invalidates the weighted.piece.weight field,
877     * but that's OK since the call to pieceListLookup ensured
878     * that we were sorted by index anyway.. next time we resort
879     * by weight, pieceListSort() will update the weights */
880}
881
882/**
883***
884**/
885
886void
887tr_peerMgrRebuildRequests( tr_torrent * tor )
888{
889    assert( tr_isTorrent( tor ) );
890
891    pieceListRebuild( tor->torrentPeers );
892}
893
894void
895tr_peerMgrGetNextRequests( tr_torrent           * tor,
896                           tr_peer              * peer,
897                           int                    numwant,
898                           tr_block_index_t     * setme,
899                           int                  * numgot )
900{
901    int i;
902    int got;
903    Torrent * t;
904    struct weighted_piece * pieces;
905    const tr_bitset * have = &peer->have;
906    const time_t now = tr_time( );
907
908    /* sanity clause */
909    assert( tr_isTorrent( tor ) );
910    assert( numwant > 0 );
911
912    /* walk through the pieces and find blocks that should be requested */
913    got = 0;
914    t = tor->torrentPeers;
915
916    /* prep the pieces list */
917    if( t->pieces == NULL )
918        pieceListRebuild( t );
919    pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
920
921    pieces = t->pieces;
922    for( i=0; i<t->pieceCount && got<numwant; ++i )
923    {
924        struct weighted_piece * p = pieces + i;
925
926        /* if the peer has this piece that we want... */
927        if( tr_bitsetHasFast( have, p->index ) )
928        {
929            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
930            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
931
932            for( ; b!=e && got<numwant; ++b )
933            {
934                struct block_request * breq;
935
936                /* don't request blocks we've already got */
937                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
938                    continue;
939
940                /* don't request blocks we've already requested (FIXME) */
941                breq = requestListLookup( t, b );
942                if( breq != NULL ) {
943                    assert( breq->peer != NULL );
944                    if( breq->peer == peer ) continue;
945                    if( !t->isInEndgame ) continue;
946                }
947
948                setme[got++] = b;
949
950                /* update our own tables */
951                if( breq == NULL )
952                    requestListAdd( t, now, b, peer );
953                ++p->requestCount;
954            }
955        }
956    }
957
958    /* We almost always change only a handful of pieces in the array.
959     * In these cases, it's cheaper to sort those changed pieces and merge,
960     * than qsort()ing the whole array again */
961    if( got > 0 )
962    {
963        struct weighted_piece * p;
964        struct weighted_piece * pieces;
965        struct weighted_piece * a = t->pieces;
966        struct weighted_piece * a_end = t->pieces + i;
967        struct weighted_piece * b = a_end;
968        struct weighted_piece * b_end = t->pieces + t->pieceCount;
969
970        /* rescore the pieces that we changed */
971        weightTorrent = t->tor;
972        qsort( a, a_end-a, sizeof( struct weighted_piece ), comparePieceByWeight );
973
974        /* allocate a new array */
975        p = pieces = tr_new( struct weighted_piece, t->pieceCount );
976
977        /* merge the two sorted arrays into this new array */
978        weightTorrent = t->tor;
979        while( a!=a_end && b!=b_end )
980            *p++ = comparePieceByWeight( a, b ) < 0 ? *a++ : *b++;
981        while( a!=a_end ) *p++ = *a++;
982        while( b!=b_end ) *p++ = *b++;
983
984#if 0
985        /* make sure we did it right */
986        assert( p - pieces == t->pieceCount );
987        for( it=pieces; it+1<p; ++it )
988            assert( it->weight <= it[1].weight );
989#endif
990
991        /* update */
992        tr_free( t->pieces );
993        t->pieces = pieces;
994    }
995
996    *numgot = got;
997}
998
999tr_bool
1000tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1001                          const tr_peer     * peer,
1002                          tr_block_index_t    block )
1003{
1004    const Torrent * t = tor->torrentPeers;
1005    const struct block_request * b = requestListLookup( (Torrent*)t, block );
1006    if( b == NULL ) return FALSE;
1007    if( b->peer == peer ) return TRUE;
1008    if( t->isInEndgame ) return TRUE;
1009    return FALSE;
1010}
1011
1012/* cancel requests that are too old */
1013static int
1014refillUpkeep( void * vmgr )
1015{
1016    time_t now;
1017    time_t too_old;
1018    tr_torrent * tor;
1019    tr_peerMgr * mgr = vmgr;
1020    managerLock( mgr );
1021
1022    now = tr_time( );
1023    too_old = now - REQUEST_TTL_SECS;
1024
1025    tor = NULL;
1026    while(( tor = tr_torrentNext( mgr->session, tor )))
1027    {
1028        Torrent * t = tor->torrentPeers;
1029        const int n = t->requestCount;
1030        if( n > 0 )
1031        {
1032            int keepCount = 0;
1033            int cancelCount = 0;
1034            struct block_request * keep = tr_new( struct block_request, n );
1035            struct block_request * cancel = tr_new( struct block_request, n );
1036            const struct block_request * it;
1037            const struct block_request * end;
1038
1039            for( it=t->requests, end=it+n; it!=end; ++it )
1040                if( it->sentAt <= too_old )
1041                    cancel[cancelCount++] = *it;
1042                else
1043                    keep[keepCount++] = *it;
1044
1045            /* prune out the ones we aren't keeping */
1046            tr_free( t->requests );
1047            t->requests = keep;
1048            t->requestCount = keepCount;
1049            t->requestAlloc = n;
1050
1051            /* send cancel messages for all the "cancel" ones */
1052            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1053                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) )
1054                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1055
1056            /* decrement the pending request counts for the timed-out blocks */
1057            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1058                pieceListRemoveRequest( t, it->block );
1059
1060            /* cleanup loop */
1061            tr_free( cancel );
1062        }
1063    }
1064
1065    managerUnlock( mgr );
1066    return TRUE;
1067}
1068
1069static void
1070addStrike( Torrent * t, tr_peer * peer )
1071{
1072    tordbg( t, "increasing peer %s strike count to %d",
1073            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1074
1075    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1076    {
1077        struct peer_atom * atom = peer->atom;
1078        atom->myflags |= MYFLAG_BANNED;
1079        peer->doPurge = 1;
1080        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1081    }
1082}
1083
1084static void
1085gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1086{
1087    tr_torrent *   tor = t->tor;
1088    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1089
1090    tor->corruptCur += byteCount;
1091    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1092}
1093
1094static void
1095peerSuggestedPiece( Torrent            * t UNUSED,
1096                    tr_peer            * peer UNUSED,
1097                    tr_piece_index_t     pieceIndex UNUSED,
1098                    int                  isFastAllowed UNUSED )
1099{
1100#if 0
1101    assert( t );
1102    assert( peer );
1103    assert( peer->msgs );
1104
1105    /* is this a valid piece? */
1106    if(  pieceIndex >= t->tor->info.pieceCount )
1107        return;
1108
1109    /* don't ask for it if we've already got it */
1110    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1111        return;
1112
1113    /* don't ask for it if they don't have it */
1114    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1115        return;
1116
1117    /* don't ask for it if we're choked and it's not fast */
1118    if( !isFastAllowed && peer->clientIsChoked )
1119        return;
1120
1121    /* request the blocks that we don't have in this piece */
1122    {
1123        tr_block_index_t block;
1124        const tr_torrent * tor = t->tor;
1125        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1126        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1127
1128        for( block=start; block<end; ++block )
1129        {
1130            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1131            {
1132                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1133                const uint32_t length = tr_torBlockCountBytes( tor, block );
1134                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1135                incrementPieceRequests( t, pieceIndex );
1136            }
1137        }
1138    }
1139#endif
1140}
1141
1142static void
1143decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount )
1144{
1145    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1146}
1147
1148static void
1149clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block )
1150{
1151    decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) );
1152}
1153
1154static void
1155removeRequestFromTables( Torrent * t, tr_block_index_t block )
1156{
1157    requestListRemove( t, block );
1158    pieceListRemoveRequest( t, block );
1159}
1160
1161/* peer choked us, or maybe it disconnected.
1162   either way we need to remove all its requests */
1163static void
1164peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1165{
1166    int i, n;
1167    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1168
1169    for( i=n=0; i<t->requestCount; ++i )
1170        if( peer == t->requests[i].peer )
1171            blocks[n++] = t->requests[i].block;
1172
1173    for( i=0; i<n; ++i )
1174        removeRequestFromTables( t, blocks[i] );
1175
1176    tr_free( blocks );
1177}
1178
1179static void
1180peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1181{
1182    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1183    Torrent * t = vt;
1184    const tr_peer_event * e = vevent;
1185
1186    torrentLock( t );
1187
1188    switch( e->eventType )
1189    {
1190        case TR_PEER_UPLOAD_ONLY:
1191            /* update our atom */
1192            if( peer ) {
1193                if( e->uploadOnly ) {
1194                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1195                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1196                } else {
1197                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1198                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1199                }
1200            }
1201            break;
1202
1203        case TR_PEER_PEER_GOT_DATA:
1204        {
1205            const time_t now = tr_time( );
1206            tr_torrent * tor = t->tor;
1207
1208            tr_torrentSetActivityDate( tor, now );
1209
1210            if( e->wasPieceData ) {
1211                tor->uploadedCur += e->length;
1212                tr_torrentSetDirty( tor );
1213            }
1214
1215            /* update the stats */
1216            if( e->wasPieceData )
1217                tr_statsAddUploaded( tor->session, e->length );
1218
1219            /* update our atom */
1220            if( peer && e->wasPieceData )
1221                peer->atom->piece_data_time = now;
1222
1223            tor->needsSeedRatioCheck = TRUE;
1224
1225            break;
1226        }
1227
1228        case TR_PEER_CLIENT_GOT_REJ:
1229            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ) );
1230            break;
1231
1232        case TR_PEER_CLIENT_GOT_CHOKE:
1233            peerDeclinedAllRequests( t, peer );
1234            break;
1235
1236        case TR_PEER_CLIENT_GOT_PORT:
1237            if( peer )
1238                peer->atom->port = e->port;
1239            break;
1240
1241        case TR_PEER_CLIENT_GOT_SUGGEST:
1242            if( peer )
1243                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1244            break;
1245
1246        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1247            if( peer )
1248                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1249            break;
1250
1251        case TR_PEER_CLIENT_GOT_DATA:
1252        {
1253            const time_t now = tr_time( );
1254            tr_torrent * tor = t->tor;
1255
1256            tr_torrentSetActivityDate( tor, now );
1257
1258            /* only add this to downloadedCur if we got it from a peer --
1259             * webseeds shouldn't count against our ratio.  As one tracker
1260             * admin put it, "Those pieces are downloaded directly from the
1261             * content distributor, not the peers, it is the tracker's job
1262             * to manage the swarms, not the web server and does not fit
1263             * into the jurisdiction of the tracker." */
1264            if( peer && e->wasPieceData ) {
1265                tor->downloadedCur += e->length;
1266                tr_torrentSetDirty( tor );
1267            }
1268
1269            /* update the stats */
1270            if( e->wasPieceData )
1271                tr_statsAddDownloaded( tor->session, e->length );
1272
1273            /* update our atom */
1274            if( peer && e->wasPieceData )
1275                peer->atom->piece_data_time = now;
1276
1277            break;
1278        }
1279
1280        case TR_PEER_PEER_PROGRESS:
1281        {
1282            if( peer )
1283            {
1284                struct peer_atom * atom = peer->atom;
1285                if( e->progress >= 1.0 ) {
1286                    tordbg( t, "marking peer %s as a seed",
1287                            tr_atomAddrStr( atom ) );
1288                    atom->flags |= ADDED_F_SEED_FLAG;
1289                }
1290            }
1291            break;
1292        }
1293
1294        case TR_PEER_CLIENT_GOT_BLOCK:
1295        {
1296            tr_torrent * tor = t->tor;
1297            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1298
1299            requestListRemove( t, block );
1300            pieceListRemoveRequest( t, block );
1301
1302            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1303            {
1304                tordbg( t, "we have this block already..." );
1305                clientGotUnwantedBlock( tor, block );
1306            }
1307            else
1308            {
1309                tr_cpBlockAdd( &tor->completion, block );
1310                tr_torrentSetDirty( tor );
1311
1312                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1313                {
1314                    const tr_piece_index_t p = e->pieceIndex;
1315                    const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1316
1317                    if( !ok )
1318                    {
1319                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1320                                   (unsigned long)p );
1321                    }
1322
1323                    tr_torrentSetHasPiece( tor, p, ok );
1324                    tr_torrentSetPieceChecked( tor, p, TRUE );
1325                    tr_peerMgrSetBlame( tor, p, ok );
1326
1327                    if( !ok )
1328                    {
1329                        gotBadPiece( t, p );
1330                    }
1331                    else
1332                    {
1333                        int i;
1334                        int peerCount;
1335                        tr_peer ** peers;
1336                        tr_file_index_t fileIndex;
1337
1338                        peerCount = tr_ptrArraySize( &t->peers );
1339                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1340                        for( i=0; i<peerCount; ++i )
1341                            tr_peerMsgsHave( peers[i]->msgs, p );
1342
1343                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1344                            const tr_file * file = &tor->info.files[fileIndex];
1345                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1346                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1347                                    tr_torrentFileCompleted( tor, fileIndex );
1348                        }
1349
1350                        pieceListRemovePiece( t, p );
1351                    }
1352                }
1353
1354                t->needsCompletenessCheck = TRUE;
1355            }
1356            break;
1357        }
1358
1359        case TR_PEER_ERROR:
1360            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1361            {
1362                /* some protocol error from the peer */
1363                peer->doPurge = 1;
1364                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1365                        tr_atomAddrStr( peer->atom ) );
1366            }
1367            else
1368            {
1369                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1370            }
1371            break;
1372
1373        default:
1374            assert( 0 );
1375    }
1376
1377    torrentUnlock( t );
1378}
1379
1380static int
1381getDefaultShelfLife( uint8_t from )
1382{
1383    /* in general, peers obtained from firsthand contact
1384     * are better than those from secondhand, etc etc */
1385    switch( from )
1386    {
1387        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1388        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1389        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1390        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1391        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1392        case TR_PEER_FROM_RESUME   : return 60 * 60;
1393        default                    : return 60 * 60;
1394    }
1395}
1396
1397
1398static void
1399ensureAtomExists( Torrent           * t,
1400                  const tr_address  * addr,
1401                  const tr_port       port,
1402                  const uint8_t       flags,
1403                  const uint8_t       from )
1404{
1405    assert( tr_isAddress( addr ) );
1406    assert( from < TR_PEER_FROM__MAX );
1407
1408    if( getExistingAtom( t, addr ) == NULL )
1409    {
1410        struct peer_atom * a;
1411        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1412
1413        a = tr_new0( struct peer_atom, 1 );
1414        a->addr = *addr;
1415        a->port = port;
1416        a->flags = flags;
1417        a->from = from;
1418        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1419        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1420
1421        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1422    }
1423}
1424
1425static int
1426getMaxPeerCount( const tr_torrent * tor )
1427{
1428    return tor->maxConnectedPeers;
1429}
1430
1431static int
1432getPeerCount( const Torrent * t )
1433{
1434    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1435}
1436
1437/* FIXME: this is kind of a mess. */
1438static tr_bool
1439myHandshakeDoneCB( tr_handshake  * handshake,
1440                   tr_peerIo     * io,
1441                   tr_bool         isConnected,
1442                   const uint8_t * peer_id,
1443                   void          * vmanager )
1444{
1445    tr_bool            ok = isConnected;
1446    tr_bool            success = FALSE;
1447    tr_port            port;
1448    const tr_address * addr;
1449    tr_peerMgr       * manager = vmanager;
1450    Torrent          * t;
1451    tr_handshake     * ours;
1452
1453    assert( io );
1454    assert( tr_isBool( ok ) );
1455
1456    t = tr_peerIoHasTorrentHash( io )
1457        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1458        : NULL;
1459
1460    if( tr_peerIoIsIncoming ( io ) )
1461        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1462                                        handshake, handshakeCompare );
1463    else if( t )
1464        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1465                                        handshake, handshakeCompare );
1466    else
1467        ours = handshake;
1468
1469    assert( ours );
1470    assert( ours == handshake );
1471
1472    if( t )
1473        torrentLock( t );
1474
1475    addr = tr_peerIoGetAddress( io, &port );
1476
1477    if( !ok || !t || !t->isRunning )
1478    {
1479        if( t )
1480        {
1481            struct peer_atom * atom = getExistingAtom( t, addr );
1482            if( atom )
1483                ++atom->numFails;
1484        }
1485    }
1486    else /* looking good */
1487    {
1488        struct peer_atom * atom;
1489
1490        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1491        atom = getExistingAtom( t, addr );
1492        atom->time = tr_time( );
1493        atom->piece_data_time = 0;
1494
1495        if( atom->myflags & MYFLAG_BANNED )
1496        {
1497            tordbg( t, "banned peer %s tried to reconnect",
1498                    tr_atomAddrStr( atom ) );
1499        }
1500        else if( tr_peerIoIsIncoming( io )
1501               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1502
1503        {
1504        }
1505        else
1506        {
1507            tr_peer * peer = atom->peer;
1508
1509            if( peer ) /* we already have this peer */
1510            {
1511            }
1512            else
1513            {
1514                peer = getPeer( t, atom );
1515                tr_free( peer->client );
1516
1517                if( !peer_id )
1518                    peer->client = NULL;
1519                else {
1520                    char client[128];
1521                    tr_clientForId( client, sizeof( client ), peer_id );
1522                    peer->client = tr_strdup( client );
1523                }
1524
1525                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1526                                                                balanced by our unref in peerDestructor()  */
1527                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1528                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1529
1530                success = TRUE;
1531            }
1532        }
1533    }
1534
1535    if( t )
1536        torrentUnlock( t );
1537
1538    return success;
1539}
1540
1541void
1542tr_peerMgrAddIncoming( tr_peerMgr * manager,
1543                       tr_address * addr,
1544                       tr_port      port,
1545                       int          socket )
1546{
1547    tr_session * session;
1548
1549    managerLock( manager );
1550
1551    assert( tr_isSession( manager->session ) );
1552    session = manager->session;
1553
1554    if( tr_sessionIsAddressBlocked( session, addr ) )
1555    {
1556        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1557        tr_netClose( session, socket );
1558    }
1559    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1560    {
1561        tr_netClose( session, socket );
1562    }
1563    else /* we don't have a connection to them yet... */
1564    {
1565        tr_peerIo *    io;
1566        tr_handshake * handshake;
1567
1568        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1569
1570        handshake = tr_handshakeNew( io,
1571                                     session->encryptionMode,
1572                                     myHandshakeDoneCB,
1573                                     manager );
1574
1575        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1576
1577        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1578                                 handshakeCompare );
1579    }
1580
1581    managerUnlock( manager );
1582}
1583
1584static tr_bool
1585tr_isPex( const tr_pex * pex )
1586{
1587    return pex && tr_isAddress( &pex->addr );
1588}
1589
1590void
1591tr_peerMgrAddPex( tr_torrent   *  tor,
1592                  uint8_t         from,
1593                  const tr_pex *  pex )
1594{
1595    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1596    {
1597        Torrent * t = tor->torrentPeers;
1598        managerLock( t->manager );
1599
1600        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1601            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1602                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1603
1604        managerUnlock( t->manager );
1605    }
1606}
1607
1608tr_pex *
1609tr_peerMgrCompactToPex( const void *    compact,
1610                        size_t          compactLen,
1611                        const uint8_t * added_f,
1612                        size_t          added_f_len,
1613                        size_t *        pexCount )
1614{
1615    size_t          i;
1616    size_t          n = compactLen / 6;
1617    const uint8_t * walk = compact;
1618    tr_pex *        pex = tr_new0( tr_pex, n );
1619
1620    for( i = 0; i < n; ++i )
1621    {
1622        pex[i].addr.type = TR_AF_INET;
1623        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1624        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1625        if( added_f && ( n == added_f_len ) )
1626            pex[i].flags = added_f[i];
1627    }
1628
1629    *pexCount = n;
1630    return pex;
1631}
1632
1633tr_pex *
1634tr_peerMgrCompact6ToPex( const void    * compact,
1635                         size_t          compactLen,
1636                         const uint8_t * added_f,
1637                         size_t          added_f_len,
1638                         size_t        * pexCount )
1639{
1640    size_t          i;
1641    size_t          n = compactLen / 18;
1642    const uint8_t * walk = compact;
1643    tr_pex *        pex = tr_new0( tr_pex, n );
1644
1645    for( i = 0; i < n; ++i )
1646    {
1647        pex[i].addr.type = TR_AF_INET6;
1648        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1649        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1650        if( added_f && ( n == added_f_len ) )
1651            pex[i].flags = added_f[i];
1652    }
1653
1654    *pexCount = n;
1655    return pex;
1656}
1657
1658tr_pex *
1659tr_peerMgrArrayToPex( const void * array,
1660                      size_t       arrayLen,
1661                      size_t      * pexCount )
1662{
1663    size_t          i;
1664    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1665    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1666    const uint8_t * walk = array;
1667    tr_pex        * pex = tr_new0( tr_pex, n );
1668
1669    for( i = 0 ; i < n ; i++ ) {
1670        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1671        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1672        pex[i].flags = 0x00;
1673        walk += sizeof( tr_address ) + 2;
1674    }
1675
1676    *pexCount = n;
1677    return pex;
1678}
1679
1680/**
1681***
1682**/
1683
1684void
1685tr_peerMgrSetBlame( tr_torrent     * tor,
1686                    tr_piece_index_t pieceIndex,
1687                    int              success )
1688{
1689    if( !success )
1690    {
1691        int        peerCount, i;
1692        Torrent *  t = tor->torrentPeers;
1693        tr_peer ** peers;
1694
1695        assert( torrentIsLocked( t ) );
1696
1697        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1698        for( i = 0; i < peerCount; ++i )
1699        {
1700            tr_peer * peer = peers[i];
1701            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1702            {
1703                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1704                        tr_atomAddrStr( peer->atom ),
1705                        pieceIndex, (int)peer->strikes + 1 );
1706                addStrike( t, peer );
1707            }
1708        }
1709    }
1710}
1711
1712int
1713tr_pexCompare( const void * va, const void * vb )
1714{
1715    const tr_pex * a = va;
1716    const tr_pex * b = vb;
1717    int i;
1718
1719    assert( tr_isPex( a ) );
1720    assert( tr_isPex( b ) );
1721
1722    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1723        return i;
1724
1725    if( a->port != b->port )
1726        return a->port < b->port ? -1 : 1;
1727
1728    return 0;
1729}
1730
1731#if 0
1732static int
1733peerPrefersCrypto( const tr_peer * peer )
1734{
1735    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1736        return TRUE;
1737
1738    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1739        return FALSE;
1740
1741    return tr_peerIoIsEncrypted( peer->io );
1742}
1743#endif
1744
1745/* better goes first */
1746static int
1747compareAtomsByUsefulness( const void * va, const void *vb )
1748{
1749    const struct peer_atom * a = * (const struct peer_atom**) va;
1750    const struct peer_atom * b = * (const struct peer_atom**) vb;
1751
1752    assert( tr_isAtom( a ) );
1753    assert( tr_isAtom( b ) );
1754
1755    if( a->piece_data_time != b->piece_data_time )
1756        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1757    if( a->from != b->from )
1758        return a->from < b->from ? -1 : 1;
1759    if( a->numFails != b->numFails )
1760        return a->numFails < b->numFails ? -1 : 1;
1761
1762    return 0;
1763}
1764
1765int
1766tr_peerMgrGetPeers( tr_torrent   * tor,
1767                    tr_pex      ** setme_pex,
1768                    uint8_t        af,
1769                    uint8_t        list_mode,
1770                    int            maxCount )
1771{
1772    int i;
1773    int n;
1774    int count = 0;
1775    int atomCount = 0;
1776    const Torrent * t = tor->torrentPeers;
1777    struct peer_atom ** atoms = NULL;
1778    tr_pex * pex;
1779    tr_pex * walk;
1780
1781    assert( tr_isTorrent( tor ) );
1782    assert( setme_pex != NULL );
1783    assert( af==TR_AF_INET || af==TR_AF_INET6 );
1784    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1785
1786    managerLock( t->manager );
1787
1788    /**
1789    ***  build a list of atoms
1790    **/
1791
1792    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1793    {
1794        int i;
1795        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1796        atomCount = tr_ptrArraySize( &t->peers );
1797        atoms = tr_new( struct peer_atom *, atomCount );
1798        for( i=0; i<atomCount; ++i )
1799            atoms[i] = peers[i]->atom;
1800    }
1801    else /* TR_PEERS_ALL */
1802    {
1803        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1804        atomCount = tr_ptrArraySize( &t->pool );
1805        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1806    }
1807
1808    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1809
1810    /**
1811    ***  add the first N of them into our return list
1812    **/
1813
1814    n = MIN( atomCount, maxCount );
1815    pex = walk = tr_new0( tr_pex, n );
1816
1817    for( i=0; i<atomCount && count<n; ++i )
1818    {
1819        const struct peer_atom * atom = atoms[i];
1820        if( atom->addr.type == af )
1821        {
1822            assert( tr_isAddress( &atom->addr ) );
1823            walk->addr = atom->addr;
1824            walk->port = atom->port;
1825            walk->flags = atom->flags;
1826            ++count;
1827            ++walk;
1828        }
1829    }
1830
1831    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1832
1833    assert( ( walk - pex ) == count );
1834    *setme_pex = pex;
1835
1836    /* cleanup */
1837    tr_free( atoms );
1838    managerUnlock( t->manager );
1839    return count;
1840}
1841
1842static int atomPulse      ( void * vmgr );
1843static int bandwidthPulse ( void * vmgr );
1844static int rechokePulse   ( void * vmgr );
1845static int reconnectPulse ( void * vmgr );
1846
1847static void
1848ensureMgrTimersExist( struct tr_peerMgr * m )
1849{
1850    tr_session * s = m->session;
1851
1852    if( m->atomTimer == NULL )
1853        m->atomTimer = tr_timerNew( s, atomPulse, m, ATOM_PERIOD_MSEC );
1854
1855    if( m->bandwidthTimer == NULL )
1856        m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
1857
1858    if( m->rechokeTimer == NULL )
1859        m->rechokeTimer = tr_timerNew( s, rechokePulse, m, RECHOKE_PERIOD_MSEC );
1860
1861    if( m->reconnectTimer == NULL )
1862        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
1863
1864    if( m->refillUpkeepTimer == NULL )
1865        m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
1866}
1867
1868void
1869tr_peerMgrStartTorrent( tr_torrent * tor )
1870{
1871    Torrent * t = tor->torrentPeers;
1872
1873    assert( t != NULL );
1874    managerLock( t->manager );
1875    ensureMgrTimersExist( t->manager );
1876
1877    t->isRunning = TRUE;
1878
1879    rechokePulse( t->manager );
1880    managerUnlock( t->manager );
1881}
1882
1883static void
1884stopTorrent( Torrent * t )
1885{
1886    int i, n;
1887
1888    assert( torrentIsLocked( t ) );
1889
1890    t->isRunning = FALSE;
1891
1892    /* disconnect the peers. */
1893    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
1894        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
1895    tr_ptrArrayClear( &t->peers );
1896
1897    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1898     * which removes the handshake from t->outgoingHandshakes... */
1899    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1900        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1901}
1902
1903void
1904tr_peerMgrStopTorrent( tr_torrent * tor )
1905{
1906    Torrent * t = tor->torrentPeers;
1907
1908    managerLock( t->manager );
1909
1910    stopTorrent( t );
1911
1912    managerUnlock( t->manager );
1913}
1914
1915void
1916tr_peerMgrAddTorrent( tr_peerMgr * manager,
1917                      tr_torrent * tor )
1918{
1919    managerLock( manager );
1920
1921    assert( tor );
1922    assert( tor->torrentPeers == NULL );
1923
1924    tor->torrentPeers = torrentConstructor( manager, tor );
1925
1926    managerUnlock( manager );
1927}
1928
1929void
1930tr_peerMgrRemoveTorrent( tr_torrent * tor )
1931{
1932    tr_torrentLock( tor );
1933
1934    stopTorrent( tor->torrentPeers );
1935    torrentDestructor( tor->torrentPeers );
1936
1937    tr_torrentUnlock( tor );
1938}
1939
1940void
1941tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1942                               int8_t           * tab,
1943                               unsigned int       tabCount )
1944{
1945    tr_piece_index_t   i;
1946    const Torrent *    t;
1947    float              interval;
1948    tr_bool            isSeed;
1949    int                peerCount;
1950    const tr_peer **   peers;
1951    tr_torrentLock( tor );
1952
1953    t = tor->torrentPeers;
1954    tor = t->tor;
1955    interval = tor->info.pieceCount / (float)tabCount;
1956    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1957    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1958    peerCount = tr_ptrArraySize( &t->peers );
1959
1960    memset( tab, 0, tabCount );
1961
1962    for( i = 0; tor && i < tabCount; ++i )
1963    {
1964        const int piece = i * interval;
1965
1966        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1967            tab[i] = -1;
1968        else if( peerCount ) {
1969            int j;
1970            for( j = 0; j < peerCount; ++j )
1971                if( tr_bitsetHas( &peers[j]->have, i ) )
1972                    ++tab[i];
1973        }
1974    }
1975
1976    tr_torrentUnlock( tor );
1977}
1978
1979/* Returns the pieces that are available from peers */
1980tr_bitfield*
1981tr_peerMgrGetAvailable( const tr_torrent * tor )
1982{
1983    int i;
1984    int peerCount;
1985    Torrent * t = tor->torrentPeers;
1986    const tr_peer ** peers;
1987    tr_bitfield * pieces;
1988    managerLock( t->manager );
1989
1990    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1991    peerCount = tr_ptrArraySize( &t->peers );
1992    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1993    for( i=0; i<peerCount; ++i )
1994        tr_bitsetOr( pieces, &peers[i]->have );
1995
1996    managerUnlock( t->manager );
1997    return pieces;
1998}
1999
2000void
2001tr_peerMgrTorrentStats( tr_torrent       * tor,
2002                        int              * setmePeersKnown,
2003                        int              * setmePeersConnected,
2004                        int              * setmeSeedsConnected,
2005                        int              * setmeWebseedsSendingToUs,
2006                        int              * setmePeersSendingToUs,
2007                        int              * setmePeersGettingFromUs,
2008                        int              * setmePeersFrom )
2009{
2010    int i, size;
2011    const Torrent * t = tor->torrentPeers;
2012    const tr_peer ** peers;
2013    const tr_webseed ** webseeds;
2014
2015    managerLock( t->manager );
2016
2017    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2018    size = tr_ptrArraySize( &t->peers );
2019
2020    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2021    *setmePeersConnected       = 0;
2022    *setmeSeedsConnected       = 0;
2023    *setmePeersGettingFromUs   = 0;
2024    *setmePeersSendingToUs     = 0;
2025    *setmeWebseedsSendingToUs  = 0;
2026
2027    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2028        setmePeersFrom[i] = 0;
2029
2030    for( i=0; i<size; ++i )
2031    {
2032        const tr_peer * peer = peers[i];
2033        const struct peer_atom * atom = peer->atom;
2034
2035        if( peer->io == NULL ) /* not connected */
2036            continue;
2037
2038        ++*setmePeersConnected;
2039
2040        ++setmePeersFrom[atom->from];
2041
2042        if( clientIsDownloadingFrom( tor, peer ) )
2043            ++*setmePeersSendingToUs;
2044
2045        if( clientIsUploadingTo( peer ) )
2046            ++*setmePeersGettingFromUs;
2047
2048        if( atom->flags & ADDED_F_SEED_FLAG )
2049            ++*setmeSeedsConnected;
2050    }
2051
2052    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2053    size = tr_ptrArraySize( &t->webseeds );
2054    for( i=0; i<size; ++i )
2055        if( tr_webseedIsActive( webseeds[i] ) )
2056            ++*setmeWebseedsSendingToUs;
2057
2058    managerUnlock( t->manager );
2059}
2060
2061float
2062tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
2063{
2064    int i;
2065    float tmp;
2066    float ret = 0;
2067
2068    const Torrent * t = tor->torrentPeers;
2069    const int n = tr_ptrArraySize( &t->webseeds );
2070    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2071
2072    for( i=0; i<n; ++i )
2073        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
2074            ret += tmp;
2075
2076    return ret;
2077}
2078
2079
2080float*
2081tr_peerMgrWebSpeeds( const tr_torrent * tor )
2082{
2083    const Torrent * t = tor->torrentPeers;
2084    const tr_webseed ** webseeds;
2085    int i;
2086    int webseedCount;
2087    float * ret;
2088    uint64_t now;
2089
2090    assert( t->manager );
2091    managerLock( t->manager );
2092
2093    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2094    webseedCount = tr_ptrArraySize( &t->webseeds );
2095    assert( webseedCount == tor->info.webseedCount );
2096    ret = tr_new0( float, webseedCount );
2097    now = tr_date( );
2098
2099    for( i=0; i<webseedCount; ++i )
2100        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
2101            ret[i] = -1.0;
2102
2103    managerUnlock( t->manager );
2104    return ret;
2105}
2106
2107double
2108tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
2109{
2110    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
2111}
2112
2113
2114struct tr_peer_stat *
2115tr_peerMgrPeerStats( const tr_torrent    * tor,
2116                     int                 * setmeCount )
2117{
2118    int i, size;
2119    const Torrent * t = tor->torrentPeers;
2120    const tr_peer ** peers;
2121    tr_peer_stat * ret;
2122    uint64_t now;
2123
2124    assert( t->manager );
2125    managerLock( t->manager );
2126
2127    size = tr_ptrArraySize( &t->peers );
2128    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2129    ret = tr_new0( tr_peer_stat, size );
2130    now = tr_date( );
2131
2132    for( i=0; i<size; ++i )
2133    {
2134        char *                   pch;
2135        const tr_peer *          peer = peers[i];
2136        const struct peer_atom * atom = peer->atom;
2137        tr_peer_stat *           stat = ret + i;
2138
2139        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2140        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2141                   sizeof( stat->client ) );
2142        stat->port               = ntohs( peer->atom->port );
2143        stat->from               = atom->from;
2144        stat->progress           = peer->progress;
2145        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2146        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
2147        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
2148        stat->peerIsChoked       = peer->peerIsChoked;
2149        stat->peerIsInterested   = peer->peerIsInterested;
2150        stat->clientIsChoked     = peer->clientIsChoked;
2151        stat->clientIsInterested = peer->clientIsInterested;
2152        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
2153        stat->isDownloadingFrom  = clientIsDownloadingFrom( tor, peer );
2154        stat->isUploadingTo      = clientIsUploadingTo( peer );
2155        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2156
2157        pch = stat->flagStr;
2158        if( t->optimistic == peer ) *pch++ = 'O';
2159        if( stat->isDownloadingFrom ) *pch++ = 'D';
2160        else if( stat->clientIsInterested ) *pch++ = 'd';
2161        if( stat->isUploadingTo ) *pch++ = 'U';
2162        else if( stat->peerIsInterested ) *pch++ = 'u';
2163        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2164        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2165        if( stat->isEncrypted ) *pch++ = 'E';
2166        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2167        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2168        if( stat->isIncoming ) *pch++ = 'I';
2169        *pch = '\0';
2170    }
2171
2172    *setmeCount = size;
2173
2174    managerUnlock( t->manager );
2175    return ret;
2176}
2177
2178/**
2179***
2180**/
2181
2182struct ChokeData
2183{
2184    tr_bool         doUnchoke;
2185    tr_bool         isInterested;
2186    tr_bool         isChoked;
2187    int             rate;
2188    tr_peer *       peer;
2189};
2190
2191static int
2192compareChoke( const void * va,
2193              const void * vb )
2194{
2195    const struct ChokeData * a = va;
2196    const struct ChokeData * b = vb;
2197
2198    if( a->rate != b->rate ) /* prefer higher overall speeds */
2199        return a->rate > b->rate ? -1 : 1;
2200
2201    if( a->isChoked != b->isChoked ) /* prefer unchoked */
2202        return a->isChoked ? 1 : -1;
2203
2204    return 0;
2205}
2206
2207static int
2208isNew( const tr_peer * peer )
2209{
2210    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2211}
2212
2213static int
2214isSame( const tr_peer * peer )
2215{
2216    return peer && peer->client && strstr( peer->client, "Transmission" );
2217}
2218
2219/**
2220***
2221**/
2222
2223static void
2224rechokeTorrent( Torrent * t, const uint64_t now )
2225{
2226    int i, size, unchokedInterested;
2227    const int peerCount = tr_ptrArraySize( &t->peers );
2228    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2229    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2230    const tr_session * session = t->manager->session;
2231    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2232
2233    assert( torrentIsLocked( t ) );
2234
2235    /* sort the peers by preference and rate */
2236    for( i = 0, size = 0; i < peerCount; ++i )
2237    {
2238        tr_peer * peer = peers[i];
2239        struct peer_atom * atom = peer->atom;
2240
2241        if( peer->progress >= 1.0 ) /* choke all seeds */
2242        {
2243            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2244        }
2245        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2246        {
2247            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2248        }
2249        else if( chokeAll ) /* choke everyone if we're not uploading */
2250        {
2251            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2252        }
2253        else
2254        {
2255            struct ChokeData * n = &choke[size++];
2256            n->peer         = peer;
2257            n->isInterested = peer->peerIsInterested;
2258            n->isChoked     = peer->peerIsChoked;
2259            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2260        }
2261    }
2262
2263    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2264
2265    /**
2266     * Reciprocation and number of uploads capping is managed by unchoking
2267     * the N peers which have the best upload rate and are interested.
2268     * This maximizes the client's download rate. These N peers are
2269     * referred to as downloaders, because they are interested in downloading
2270     * from the client.
2271     *
2272     * Peers which have a better upload rate (as compared to the downloaders)
2273     * but aren't interested get unchoked. If they become interested, the
2274     * downloader with the worst upload rate gets choked. If a client has
2275     * a complete file, it uses its upload rate rather than its download
2276     * rate to decide which peers to unchoke.
2277     */
2278    unchokedInterested = 0;
2279    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2280        choke[i].doUnchoke = 1;
2281        if( choke[i].isInterested )
2282            ++unchokedInterested;
2283    }
2284
2285    /* optimistic unchoke */
2286    if( i < size )
2287    {
2288        int n;
2289        struct ChokeData * c;
2290        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2291
2292        for( ; i<size; ++i )
2293        {
2294            if( choke[i].isInterested )
2295            {
2296                const tr_peer * peer = choke[i].peer;
2297                int x = 1, y;
2298                if( isNew( peer ) ) x *= 3;
2299                if( isSame( peer ) ) x *= 3;
2300                for( y=0; y<x; ++y )
2301                    tr_ptrArrayAppend( &randPool, &choke[i] );
2302            }
2303        }
2304
2305        if(( n = tr_ptrArraySize( &randPool )))
2306        {
2307            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2308            c->doUnchoke = 1;
2309            t->optimistic = c->peer;
2310        }
2311
2312        tr_ptrArrayDestruct( &randPool, NULL );
2313    }
2314
2315    for( i=0; i<size; ++i )
2316        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2317
2318    /* cleanup */
2319    tr_free( choke );
2320}
2321
2322static int
2323rechokePulse( void * vmgr )
2324{
2325    uint64_t now;
2326    tr_torrent * tor = NULL;
2327    tr_peerMgr * mgr = vmgr;
2328    managerLock( mgr );
2329
2330    now = tr_date( );
2331    while(( tor = tr_torrentNext( mgr->session, tor )))
2332        if( tor->isRunning )
2333            rechokeTorrent( tor->torrentPeers, now );
2334
2335    managerUnlock( mgr );
2336    return TRUE;
2337}
2338
2339/***
2340****
2341****  Life and Death
2342****
2343***/
2344
2345typedef enum
2346{
2347    TR_CAN_KEEP,
2348    TR_CAN_CLOSE,
2349    TR_MUST_CLOSE,
2350}
2351tr_close_type_t;
2352
2353static tr_close_type_t
2354shouldPeerBeClosed( const Torrent    * t,
2355                    const tr_peer    * peer,
2356                    int                peerCount,
2357                    const time_t       now )
2358{
2359    const tr_torrent *       tor = t->tor;
2360    const struct peer_atom * atom = peer->atom;
2361
2362    /* if it's marked for purging, close it */
2363    if( peer->doPurge )
2364    {
2365        tordbg( t, "purging peer %s because its doPurge flag is set",
2366                tr_atomAddrStr( atom ) );
2367        return TR_MUST_CLOSE;
2368    }
2369
2370    /* if we're seeding and the peer has everything we have,
2371     * and enough time has passed for a pex exchange, then disconnect */
2372    if( tr_torrentIsSeed( tor ) )
2373    {
2374        int peerHasEverything;
2375        if( atom->flags & ADDED_F_SEED_FLAG )
2376            peerHasEverything = TRUE;
2377        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2378            peerHasEverything = FALSE;
2379        else {
2380            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2381            tr_bitsetDifference( tmp, &peer->have );
2382            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2383            tr_bitfieldFree( tmp );
2384        }
2385
2386        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2387        {
2388            tordbg( t, "purging peer %s because we're both seeds",
2389                    tr_atomAddrStr( atom ) );
2390            return TR_MUST_CLOSE;
2391        }
2392    }
2393
2394    /* disconnect if it's been too long since piece data has been transferred.
2395     * this is on a sliding scale based on number of available peers... */
2396    {
2397        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2398        /* if we have >= relaxIfFewerThan, strictness is 100%.
2399         * if we have zero connections, strictness is 0% */
2400        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2401                               ? 1.0
2402                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2403        const int lo = MIN_UPLOAD_IDLE_SECS;
2404        const int hi = MAX_UPLOAD_IDLE_SECS;
2405        const int limit = hi - ( ( hi - lo ) * strictness );
2406        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2407/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2408        if( idleTime > limit ) {
2409            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2410                       tr_atomAddrStr( atom ), idleTime );
2411            return TR_CAN_CLOSE;
2412        }
2413    }
2414
2415    return TR_CAN_KEEP;
2416}
2417
2418static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2419
2420static tr_peer **
2421getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2422{
2423    int i, peerCount, outsize;
2424    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2425    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2426
2427    assert( torrentIsLocked( t ) );
2428
2429    for( i = outsize = 0; i < peerCount; ++i )
2430        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2431            ret[outsize++] = peers[i];
2432
2433    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2434
2435    *setmeSize = outsize;
2436    return ret;
2437}
2438
2439static int
2440compareCandidates( const void * va, const void * vb )
2441{
2442    const struct peer_atom * a = *(const struct peer_atom**) va;
2443    const struct peer_atom * b = *(const struct peer_atom**) vb;
2444
2445    /* <Charles> Here we would probably want to try reconnecting to
2446     * peers that had most recently given us data. Lots of users have
2447     * trouble with resets due to their routers and/or ISPs. This way we
2448     * can quickly recover from an unwanted reset. So we sort
2449     * piece_data_time in descending order.
2450     */
2451
2452    if( a->piece_data_time != b->piece_data_time )
2453        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2454
2455    if( a->numFails != b->numFails )
2456        return a->numFails < b->numFails ? -1 : 1;
2457
2458    if( a->time != b->time )
2459        return a->time < b->time ? -1 : 1;
2460
2461    /* In order to avoid fragmenting the swarm, peers from trackers and
2462     * from the DHT should be preferred to peers from PEX. */
2463    if( a->from != b->from )
2464        return a->from < b->from ? -1 : 1;
2465
2466    return 0;
2467}
2468
2469static int
2470getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2471{
2472    int sec;
2473
2474    /* if we were recently connected to this peer and transferring piece
2475     * data, try to reconnect to them sooner rather that later -- we don't
2476     * want network troubles to get in the way of a good peer. */
2477    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2478        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2479
2480    /* don't allow reconnects more often than our minimum */
2481    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2482        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2483
2484    /* otherwise, the interval depends on how many times we've tried
2485     * and failed to connect to the peer */
2486    else switch( atom->numFails ) {
2487        case 0: sec = 0; break;
2488        case 1: sec = 5; break;
2489        case 2: sec = 2 * 60; break;
2490        case 3: sec = 15 * 60; break;
2491        case 4: sec = 30 * 60; break;
2492        case 5: sec = 60 * 60; break;
2493        default: sec = 120 * 60; break;
2494    }
2495
2496    return sec;
2497}
2498
2499static struct peer_atom **
2500getPeerCandidates( Torrent * t, const time_t now, int * setmeSize )
2501{
2502    int                 i, atomCount, retCount;
2503    struct peer_atom ** atoms;
2504    struct peer_atom ** ret;
2505    const int           seed = tr_torrentIsSeed( t->tor );
2506
2507    assert( torrentIsLocked( t ) );
2508
2509    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2510    ret = tr_new( struct peer_atom*, atomCount );
2511    for( i = retCount = 0; i < atomCount; ++i )
2512    {
2513        int                interval;
2514        struct peer_atom * atom = atoms[i];
2515
2516        /* peer fed us too much bad data ... we only keep it around
2517         * now to weed it out in case someone sends it to us via pex */
2518        if( atom->myflags & MYFLAG_BANNED )
2519            continue;
2520
2521        /* peer was unconnectable before, so we're not going to keep trying.
2522         * this is needs a separate flag from `banned', since if they try
2523         * to connect to us later, we'll let them in */
2524        if( atom->myflags & MYFLAG_UNREACHABLE )
2525            continue;
2526
2527        /* no need to connect if we're both seeds... */
2528        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2529                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2530            continue;
2531
2532        /* don't reconnect too often */
2533        interval = getReconnectIntervalSecs( atom, now );
2534        if( ( now - atom->time ) < interval )
2535        {
2536            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2537                    i, tr_atomAddrStr( atom ), interval );
2538            continue;
2539        }
2540
2541        /* Don't connect to peers in our blocklist */
2542        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2543            continue;
2544
2545        /* we don't need two connections to the same peer... */
2546        if( peerIsInUse( t, atom ) )
2547            continue;
2548
2549        ret[retCount++] = atom;
2550    }
2551
2552    if( retCount != 0 )
2553        qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2554    *setmeSize = retCount;
2555    return ret;
2556}
2557
2558static void
2559closePeer( Torrent * t, tr_peer * peer )
2560{
2561    struct peer_atom * atom;
2562
2563    assert( t != NULL );
2564    assert( peer != NULL );
2565
2566    atom = peer->atom;
2567
2568    /* if we transferred piece data, then they might be good peers,
2569       so reset their `numFails' weight to zero.  otherwise we connected
2570       to them fruitlessly, so mark it as another fail */
2571    if( atom->piece_data_time )
2572        atom->numFails = 0;
2573    else
2574        ++atom->numFails;
2575
2576    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2577    removePeer( t, peer );
2578}
2579
2580static void
2581reconnectTorrent( Torrent * t )
2582{
2583    static time_t prevTime = 0;
2584    static int    newConnectionsThisSecond = 0;
2585    const time_t  now = tr_time( );
2586
2587    if( prevTime != now )
2588    {
2589        prevTime = now;
2590        newConnectionsThisSecond = 0;
2591    }
2592
2593    if( !t->isRunning )
2594    {
2595        removeAllPeers( t );
2596    }
2597    else
2598    {
2599        int i;
2600        int mustCloseCount;
2601        int maxCandidates;
2602        struct tr_peer ** mustClose;
2603
2604        /* disconnect the really bad peers */
2605        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2606        for( i=0; i<mustCloseCount; ++i )
2607            closePeer( t, mustClose[i] );
2608        tr_free( mustClose );
2609
2610        /* decide how many peers can we try to add in this pass */
2611        maxCandidates = MAX_RECONNECTIONS_PER_PULSE;
2612        if( tr_announcerHasBacklog( t->manager->session->announcer ) )
2613            maxCandidates /= 2;
2614        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2615        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2616
2617        /* select the best candidates, if they are requested */
2618        if( maxCandidates == 0 )
2619        {
2620            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2621                       "NO connection candidates needed, %d atoms, "
2622                       "max per pulse is %d",
2623                       t->tor->info.name, mustCloseCount,
2624                       tr_ptrArraySize( &t->pool ),
2625                       MAX_RECONNECTIONS_PER_PULSE );
2626
2627            tordbg( t, "maxCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, "
2628                       "getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2629                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2630                       maxCandidates, MAX_RECONNECTIONS_PER_PULSE,
2631                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2632                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2633        }
2634        else
2635        {
2636            int canCloseCount = 0;
2637            int candidateCount;
2638            struct peer_atom ** candidates;
2639
2640            candidates = getPeerCandidates( t, now, &candidateCount );
2641            maxCandidates = MIN( maxCandidates, candidateCount );
2642
2643            /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2644            if( maxCandidates != 0 )
2645            {
2646                struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, now, &canCloseCount );
2647                for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2648                   closePeer( t, canClose[i] );
2649                tr_free( canClose );
2650            }
2651
2652            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2653                       "%d can-close connections, %d connection candidates, "
2654                       "%d atoms, max per pulse is %d",
2655                       t->tor->info.name, mustCloseCount,
2656                       canCloseCount, candidateCount,
2657                       tr_ptrArraySize( &t->pool ), MAX_RECONNECTIONS_PER_PULSE );
2658
2659            tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2660                       " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2661                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2662                       candidateCount, MAX_RECONNECTIONS_PER_PULSE,
2663                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2664                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2665
2666            /* add some new ones */
2667            for( i=0; i<maxCandidates; ++i )
2668            {
2669                tr_peerMgr        * mgr = t->manager;
2670                struct peer_atom  * atom = candidates[i];
2671                tr_peerIo         * io;
2672
2673                tordbg( t, "Starting an OUTGOING connection with %s",
2674                        tr_atomAddrStr( atom ) );
2675
2676                io = tr_peerIoNewOutgoing( mgr->session,
2677                                           mgr->session->bandwidth,
2678                                           &atom->addr,
2679                                           atom->port,
2680                                           t->tor->info.hash,
2681                                           t->tor->completeness == TR_SEED );
2682
2683                if( io == NULL )
2684                {
2685                    tordbg( t, "peerIo not created; marking peer %s as unreachable",
2686                            tr_atomAddrStr( atom ) );
2687                    atom->myflags |= MYFLAG_UNREACHABLE;
2688                }
2689                else
2690                {
2691                    tr_handshake * handshake = tr_handshakeNew( io,
2692                                                                mgr->session->encryptionMode,
2693                                                                myHandshakeDoneCB,
2694                                                                mgr );
2695
2696                    assert( tr_peerIoGetTorrentHash( io ) );
2697
2698                    tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2699
2700                    ++newConnectionsThisSecond;
2701
2702                    tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2703                                             handshakeCompare );
2704                }
2705
2706                atom->time = now;
2707            }
2708            tr_free( candidates );
2709        }
2710    }
2711}
2712
2713struct peer_liveliness
2714{
2715    tr_peer * peer;
2716    void * clientData;
2717    time_t pieceDataTime;
2718    time_t time;
2719    int speed;
2720    tr_bool doPurge;
2721};
2722
2723static int
2724comparePeerLiveliness( const void * va, const void * vb )
2725{
2726    const struct peer_liveliness * a = va;
2727    const struct peer_liveliness * b = vb;
2728
2729    if( a->doPurge != b->doPurge )
2730        return a->doPurge ? 1 : -1;
2731
2732    if( a->speed != b->speed ) /* faster goes first */
2733        return a->speed > b->speed ? -1 : 1;
2734
2735    /* the one to give us data more recently goes first */
2736    if( a->pieceDataTime != b->pieceDataTime )
2737        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2738
2739    /* the one we connected to most recently goes first */
2740    if( a->time != b->time )
2741        return a->time > b->time ? -1 : 1;
2742
2743    return 0;
2744}
2745
2746static int
2747comparePeerLivelinessReverse( const void * va, const void * vb )
2748{
2749    return -comparePeerLiveliness (va, vb);
2750}
2751
2752static void
2753sortPeersByLivelinessImpl( tr_peer  ** peers,
2754                           void     ** clientData,
2755                           int         n,
2756                           uint64_t    now,
2757                           int (*compare) ( const void *va, const void *vb ) )
2758{
2759    int i;
2760    struct peer_liveliness *lives, *l;
2761
2762    /* build a sortable array of peer + extra info */
2763    lives = l = tr_new0( struct peer_liveliness, n );
2764    for( i=0; i<n; ++i, ++l )
2765    {
2766        tr_peer * p = peers[i];
2767        l->peer = p;
2768        l->doPurge = p->doPurge;
2769        l->pieceDataTime = p->atom->piece_data_time;
2770        l->time = p->atom->time;
2771        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2772                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2773        if( clientData )
2774            l->clientData = clientData[i];
2775    }
2776
2777    /* sort 'em */
2778    assert( n == ( l - lives ) );
2779    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2780
2781    /* build the peer array */
2782    for( i=0, l=lives; i<n; ++i, ++l ) {
2783        peers[i] = l->peer;
2784        if( clientData )
2785            clientData[i] = l->clientData;
2786    }
2787    assert( n == ( l - lives ) );
2788
2789    /* cleanup */
2790    tr_free( lives );
2791}
2792
2793static void
2794sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2795{
2796    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2797}
2798
2799static void
2800sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2801{
2802    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2803}
2804
2805
2806static void
2807enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2808{
2809    int n = tr_ptrArraySize( &t->peers );
2810    const int max = tr_torrentGetPeerLimit( t->tor );
2811    if( n > max )
2812    {
2813        void * base = tr_ptrArrayBase( &t->peers );
2814        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2815        sortPeersByLiveliness( peers, NULL, n, now );
2816        while( n > max )
2817            closePeer( t, peers[--n] );
2818        tr_free( peers );
2819    }
2820}
2821
2822static void
2823enforceSessionPeerLimit( tr_session * session, uint64_t now )
2824{
2825    int n = 0;
2826    tr_torrent * tor = NULL;
2827    const int max = tr_sessionGetPeerLimit( session );
2828
2829    /* count the total number of peers */
2830    while(( tor = tr_torrentNext( session, tor )))
2831        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2832
2833    /* if there are too many, prune out the worst */
2834    if( n > max )
2835    {
2836        tr_peer ** peers = tr_new( tr_peer*, n );
2837        Torrent ** torrents = tr_new( Torrent*, n );
2838
2839        /* populate the peer array */
2840        n = 0;
2841        tor = NULL;
2842        while(( tor = tr_torrentNext( session, tor ))) {
2843            int i;
2844            Torrent * t = tor->torrentPeers;
2845            const int tn = tr_ptrArraySize( &t->peers );
2846            for( i=0; i<tn; ++i, ++n ) {
2847                peers[n] = tr_ptrArrayNth( &t->peers, i );
2848                torrents[n] = t;
2849            }
2850        }
2851
2852        /* sort 'em */
2853        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2854
2855        /* cull out the crappiest */
2856        while( n-- > max )
2857            closePeer( torrents[n], peers[n] );
2858
2859        /* cleanup */
2860        tr_free( torrents );
2861        tr_free( peers );
2862    }
2863}
2864
2865
2866static int
2867reconnectPulse( void * vmgr )
2868{
2869    tr_torrent * tor;
2870    tr_peerMgr * mgr = vmgr;
2871    uint64_t now;
2872    managerLock( mgr );
2873
2874    now = tr_date( );
2875
2876    /* if we're over the per-torrent peer limits, cull some peers */
2877    tor = NULL;
2878    while(( tor = tr_torrentNext( mgr->session, tor )))
2879        if( tor->isRunning )
2880            enforceTorrentPeerLimit( tor->torrentPeers, now );
2881
2882    /* if we're over the per-session peer limits, cull some peers */
2883    enforceSessionPeerLimit( mgr->session, now );
2884
2885    tor = NULL;
2886    while(( tor = tr_torrentNext( mgr->session, tor )))
2887        if( tor->isRunning )
2888            reconnectTorrent( tor->torrentPeers );
2889
2890    managerUnlock( mgr );
2891    return TRUE;
2892}
2893
2894/****
2895*****
2896*****  BANDWIDTH ALLOCATION
2897*****
2898****/
2899
2900static void
2901pumpAllPeers( tr_peerMgr * mgr )
2902{
2903    tr_torrent * tor = NULL;
2904
2905    while(( tor = tr_torrentNext( mgr->session, tor )))
2906    {
2907        int j;
2908        Torrent * t = tor->torrentPeers;
2909
2910        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2911        {
2912            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2913            tr_peerMsgsPulse( peer->msgs );
2914        }
2915    }
2916}
2917
2918static int
2919bandwidthPulse( void * vmgr )
2920{
2921    tr_torrent * tor = NULL;
2922    tr_peerMgr * mgr = vmgr;
2923    managerLock( mgr );
2924
2925    /* FIXME: this next line probably isn't necessary... */
2926    pumpAllPeers( mgr );
2927
2928    /* allocate bandwidth to the peers */
2929    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2930    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2931
2932    /* possibly stop torrents that have seeded enough */
2933    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2934        if( tor->needsSeedRatioCheck ) {
2935            tor->needsSeedRatioCheck = FALSE;
2936            tr_torrentCheckSeedRatio( tor );
2937        }
2938    }
2939
2940    /* run the completeness check for any torrents that need it */
2941    tor = NULL;
2942    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2943        if( tor->torrentPeers->needsCompletenessCheck ) {
2944            tor->torrentPeers->needsCompletenessCheck  = FALSE;
2945            tr_torrentRecheckCompleteness( tor );
2946        }
2947    }
2948
2949    /* possibly stop torrents that have an error */
2950    tor = NULL;
2951    while(( tor = tr_torrentNext( mgr->session, tor )))
2952        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
2953            tr_torrentStop( tor );
2954
2955    managerUnlock( mgr );
2956    return TRUE;
2957}
2958
2959/***
2960****
2961***/
2962
2963static int
2964compareAtomPtrsByAddress( const void * va, const void *vb )
2965{
2966    const struct peer_atom * a = * (const struct peer_atom**) va;
2967    const struct peer_atom * b = * (const struct peer_atom**) vb;
2968
2969    assert( tr_isAtom( a ) );
2970    assert( tr_isAtom( b ) );
2971
2972    return tr_compareAddresses( &a->addr, &b->addr );
2973}
2974
2975static time_t tr_now = 0;
2976
2977/* best come first, worst go last */
2978static int
2979compareAtomPtrsByShelfDate( const void * va, const void *vb )
2980{
2981    time_t atime;
2982    time_t btime;
2983    const struct peer_atom * a = * (const struct peer_atom**) va;
2984    const struct peer_atom * b = * (const struct peer_atom**) vb;
2985    const int data_time_cutoff_secs = 60 * 60;
2986
2987    assert( tr_isAtom( a ) );
2988    assert( tr_isAtom( b ) );
2989
2990    /* primary key: the last piece data time *if* it was within the last hour */
2991    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
2992    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
2993    if( atime != btime )
2994        return atime > btime ? -1 : 1;
2995
2996    /* secondary key: shelf date. */
2997    if( a->shelf_date != b->shelf_date )
2998        return a->shelf_date > b->shelf_date ? -1 : 1;
2999
3000    return 0;
3001}
3002
3003static int
3004getMaxAtomCount( const tr_torrent * tor )
3005{
3006    /* FIXME: this curve should be smoother... */
3007    const int n = tor->maxConnectedPeers;
3008    if( n >= 200 ) return n * 1.5;
3009    if( n >= 100 ) return n * 2;
3010    if( n >=  50 ) return n * 3;
3011    if( n >=  20 ) return n * 5;
3012    return n * 10;
3013}
3014
3015static int
3016atomPulse( void * vmgr )
3017{
3018    tr_torrent * tor = NULL;
3019    tr_peerMgr * mgr = vmgr;
3020    managerLock( mgr );
3021
3022    while(( tor = tr_torrentNext( mgr->session, tor )))
3023    {
3024        int atomCount;
3025        Torrent * t = tor->torrentPeers;
3026        const int maxAtomCount = getMaxAtomCount( tor );
3027        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3028
3029        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3030        {
3031            int i;
3032            int keepCount = 0;
3033            int testCount = 0;
3034            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3035            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3036
3037            /* keep the ones that are in use */
3038            for( i=0; i<atomCount; ++i ) {
3039                struct peer_atom * atom = atoms[i];
3040                if( peerIsInUse( t, atom ) )
3041                    keep[keepCount++] = atom;
3042                else
3043                    test[testCount++] = atom;
3044            }
3045
3046            /* if there's room, keep the best of what's left */
3047            i = 0;
3048            if( keepCount < maxAtomCount ) {
3049                tr_now = tr_time( );
3050                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3051                while( i<testCount && keepCount<maxAtomCount )
3052                    keep[keepCount++] = test[i++];
3053            }
3054
3055            /* free the culled atoms */
3056            while( i<testCount )
3057                tr_free( test[i++] );
3058
3059            /* rebuild Torrent.pool with what's left */
3060            tr_ptrArrayDestruct( &t->pool, NULL );
3061            t->pool = TR_PTR_ARRAY_INIT;
3062            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3063            for( i=0; i<keepCount; ++i )
3064                tr_ptrArrayAppend( &t->pool, keep[i] );
3065
3066            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3067
3068            /* cleanup */
3069            tr_free( test );
3070            tr_free( keep );
3071        }
3072    }
3073
3074    managerUnlock( mgr );
3075    return TRUE;
3076}
Note: See TracBrowser for help on using the repository browser.