source: trunk/libtransmission/peer-mgr.c @ 9592

Last change on this file since 9592 was 9592, checked in by charles, 13 years ago

(trunk libT) #2430 "peer atom pool grows too large" -- tweak the default atom shelf lives based on discussion in the ticket's comments section

  • Property svn:keywords set to Date Rev Author Id
File size: 86.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 9592 2009-11-26 18:38:37Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "ptrarray.h"
35#include "session.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to cull old atoms */
45    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
46
47    /* how frequently to change which peers are choked */
48    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
49
50    /* minimum interval for refilling peers' request lists */
51    REFILL_PERIOD_MSEC = 400,
52
53    /* how frequently to reallocate bandwidth */
54    BANDWIDTH_PERIOD_MSEC = 500,
55
56    /* how frequently to age out old piece request lists */
57    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
58
59    /* how frequently to decide which peers live and die */
60    RECONNECT_PERIOD_MSEC = 500,
61
62    /* when many peers are available, keep idle ones this long */
63    MIN_UPLOAD_IDLE_SECS = ( 60 ),
64
65    /* when few peers are available, keep idle ones this long */
66    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
67
68    /* max # of peers to ask fer per torrent per reconnect pulse */
69    MAX_RECONNECTIONS_PER_PULSE = 8,
70
71    /* max number of peers to ask for per second overall.
72    * this throttle is to avoid overloading the router */
73    MAX_CONNECTIONS_PER_SECOND = 16,
74
75    /* number of bad pieces a peer is allowed to send before we ban them */
76    MAX_BAD_PIECES_PER_PEER = 5,
77
78    /* amount of time to keep a list of request pieces lying around
79       before it's considered too old and needs to be rebuilt */
80    PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82    /* use for bitwise operations w/peer_atom.myflags */
83    MYFLAG_BANNED = 1,
84
85    /* use for bitwise operations w/peer_atom.myflags */
86    /* unreachable for now... but not banned.
87     * if they try to connect to us it's okay */
88    MYFLAG_UNREACHABLE = 2,
89
90    /* the minimum we'll wait before attempting to reconnect to a peer */
91    MINIMUM_RECONNECT_INTERVAL_SECS = 5
92};
93
94
95/**
96***
97**/
98
99enum
100{
101    UPLOAD_ONLY_UKNOWN,
102    UPLOAD_ONLY_YES,
103    UPLOAD_ONLY_NO
104};
105
106/**
107 * Peer information that should be kept even before we've connected and
108 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
109 * which ones would make good candidates for connecting to, and to watch out
110 * for banned peers.
111 *
112 * @see tr_peer
113 * @see tr_peermsgs
114 */
115struct peer_atom
116{
117    uint8_t     from;
118    uint8_t     flags;       /* these match the added_f flags */
119    uint8_t     myflags;     /* flags that aren't defined in added_f */
120    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
121    tr_port     port;
122    uint16_t    numFails;
123    tr_address  addr;
124    time_t      time;        /* when the peer's connection status last changed */
125    time_t      piece_data_time;
126
127    /* similar to a TTL field, but less rigid --
128     * if the swarm is small, the atom will be kept past this date. */
129    time_t      shelf_date;
130};
131
132static tr_bool
133tr_isAtom( const struct peer_atom * atom )
134{
135    return ( atom != NULL )
136        && ( atom->from < TR_PEER_FROM__MAX )
137        && ( tr_isAddress( &atom->addr ) );
138}
139
140static const char*
141tr_atomAddrStr( const struct peer_atom * atom )
142{
143    return tr_peerIoAddrStr( &atom->addr, atom->port );
144}
145
146struct block_request
147{
148    tr_block_index_t block;
149    tr_peer * peer;
150    time_t sentAt;
151};
152
153struct weighted_piece
154{
155    tr_piece_index_t index;
156    int16_t salt;
157    int16_t requestCount;
158};
159
160typedef struct tr_torrent_peers
161{
162    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
163    tr_ptrArray                pool; /* struct peer_atom */
164    tr_ptrArray                peers; /* tr_peer */
165    tr_ptrArray                webseeds; /* tr_webseed */
166
167    tr_torrent               * tor;
168    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
169    struct tr_peerMgr        * manager;
170
171    tr_bool                    isRunning;
172    tr_bool                    needsCompletenessCheck;
173
174    struct block_request     * requests;
175    int                        requestsSort;
176    int                        requestCount;
177    int                        requestAlloc;
178
179    struct weighted_piece    * pieces;
180    int                        piecesSort;
181    int                        pieceCount;
182
183    tr_bool                    isInEndgame;
184}
185Torrent;
186
187struct tr_peerMgr
188{
189    tr_session      * session;
190    tr_ptrArray       incomingHandshakes; /* tr_handshake */
191    tr_timer        * bandwidthTimer;
192    tr_timer        * rechokeTimer;
193    tr_timer        * reconnectTimer;
194    tr_timer        * refillUpkeepTimer;
195    tr_timer        * atomTimer;
196};
197
198#define tordbg( t, ... ) \
199    do { \
200        if( tr_deepLoggingIsActive( ) ) \
201            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
202    } while( 0 )
203
204#define dbgmsg( ... ) \
205    do { \
206        if( tr_deepLoggingIsActive( ) ) \
207            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
208    } while( 0 )
209
210/**
211***
212**/
213
214static TR_INLINE void
215managerLock( const struct tr_peerMgr * manager )
216{
217    tr_globalLock( manager->session );
218}
219
220static TR_INLINE void
221managerUnlock( const struct tr_peerMgr * manager )
222{
223    tr_globalUnlock( manager->session );
224}
225
226static TR_INLINE void
227torrentLock( Torrent * torrent )
228{
229    managerLock( torrent->manager );
230}
231
232static TR_INLINE void
233torrentUnlock( Torrent * torrent )
234{
235    managerUnlock( torrent->manager );
236}
237
238static TR_INLINE int
239torrentIsLocked( const Torrent * t )
240{
241    return tr_globalIsLocked( t->manager->session );
242}
243
244/**
245***
246**/
247
248static int
249handshakeCompareToAddr( const void * va, const void * vb )
250{
251    const tr_handshake * a = va;
252
253    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
254}
255
256static int
257handshakeCompare( const void * a, const void * b )
258{
259    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
260}
261
262static tr_handshake*
263getExistingHandshake( tr_ptrArray      * handshakes,
264                      const tr_address * addr )
265{
266    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
267}
268
269static int
270comparePeerAtomToAddress( const void * va, const void * vb )
271{
272    const struct peer_atom * a = va;
273
274    return tr_compareAddresses( &a->addr, vb );
275}
276
277static int
278compareAtomsByAddress( const void * va, const void * vb )
279{
280    const struct peer_atom * b = vb;
281
282    assert( tr_isAtom( b ) );
283
284    return comparePeerAtomToAddress( va, &b->addr );
285}
286
287/**
288***
289**/
290
291const tr_address *
292tr_peerAddress( const tr_peer * peer )
293{
294    return &peer->atom->addr;
295}
296
297static Torrent*
298getExistingTorrent( tr_peerMgr *    manager,
299                    const uint8_t * hash )
300{
301    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
302
303    return tor == NULL ? NULL : tor->torrentPeers;
304}
305
306static int
307peerCompare( const void * a, const void * b )
308{
309    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
310}
311
312static int
313peerCompareToAddr( const void * a, const void * vb )
314{
315    return tr_compareAddresses( tr_peerAddress( a ), vb );
316}
317
318static tr_peer*
319getExistingPeer( Torrent          * torrent,
320                 const tr_address * addr )
321{
322    assert( torrentIsLocked( torrent ) );
323    assert( addr );
324
325    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
326}
327
328static struct peer_atom*
329getExistingAtom( const Torrent    * t,
330                 const tr_address * addr )
331{
332    Torrent * tt = (Torrent*)t;
333    assert( torrentIsLocked( t ) );
334    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
335}
336
337static tr_bool
338peerIsInUse( const Torrent    * ct,
339             const tr_address * addr )
340{
341    Torrent * t = (Torrent*) ct;
342
343    assert( torrentIsLocked ( t ) );
344
345    return getExistingPeer( t, addr )
346        || getExistingHandshake( &t->outgoingHandshakes, addr )
347        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
348}
349
350static tr_peer*
351peerConstructor( struct peer_atom * atom )
352{
353    tr_peer * peer = tr_new0( tr_peer, 1 );
354    tr_bitsetConstructor( &peer->have, 0 );
355    peer->atom = atom;
356    return peer;
357}
358
359static tr_peer*
360getPeer( Torrent * torrent, struct peer_atom * atom )
361{
362    tr_peer * peer;
363
364    assert( torrentIsLocked( torrent ) );
365
366    peer = getExistingPeer( torrent, &atom->addr );
367
368    if( peer == NULL )
369    {
370        peer = peerConstructor( atom );
371        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
372    }
373
374    return peer;
375}
376
377static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
378
379static void
380peerDestructor( Torrent * t, tr_peer * peer )
381{
382    assert( peer != NULL );
383
384    peerDeclinedAllRequests( t, peer );
385
386    if( peer->msgs != NULL )
387    {
388        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
389        tr_peerMsgsFree( peer->msgs );
390    }
391
392    tr_peerIoClear( peer->io );
393    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
394
395    tr_bitsetDestructor( &peer->have );
396    tr_bitfieldFree( peer->blame );
397    tr_free( peer->client );
398
399    tr_free( peer );
400}
401
402static void
403removePeer( Torrent * t, tr_peer * peer )
404{
405    tr_peer * removed;
406    struct peer_atom * atom = peer->atom;
407
408    assert( torrentIsLocked( t ) );
409    assert( atom );
410
411    atom->time = time( NULL );
412
413    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
414    assert( removed == peer );
415    peerDestructor( t, removed );
416}
417
418static void
419removeAllPeers( Torrent * t )
420{
421    while( !tr_ptrArrayEmpty( &t->peers ) )
422        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
423}
424
425static void
426torrentDestructor( void * vt )
427{
428    Torrent * t = vt;
429
430    assert( t );
431    assert( !t->isRunning );
432    assert( torrentIsLocked( t ) );
433    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
434    assert( tr_ptrArrayEmpty( &t->peers ) );
435
436    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
437    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
438    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
439    tr_ptrArrayDestruct( &t->peers, NULL );
440
441    tr_free( t->requests );
442    tr_free( t->pieces );
443    tr_free( t );
444}
445
446static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
447
448static Torrent*
449torrentConstructor( tr_peerMgr * manager,
450                    tr_torrent * tor )
451{
452    int       i;
453    Torrent * t;
454
455    t = tr_new0( Torrent, 1 );
456    t->manager = manager;
457    t->tor = tor;
458    t->pool = TR_PTR_ARRAY_INIT;
459    t->peers = TR_PTR_ARRAY_INIT;
460    t->webseeds = TR_PTR_ARRAY_INIT;
461    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
462    t->requests = 0;
463
464    for( i = 0; i < tor->info.webseedCount; ++i )
465    {
466        tr_webseed * w =
467            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
468        tr_ptrArrayAppend( &t->webseeds, w );
469    }
470
471    return t;
472}
473
474tr_peerMgr*
475tr_peerMgrNew( tr_session * session )
476{
477    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
478    m->session = session;
479    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
480    return m;
481}
482
483static void
484deleteTimers( struct tr_peerMgr * m )
485{
486    if( m->atomTimer )
487        tr_timerFree( &m->atomTimer );
488
489    if( m->bandwidthTimer )
490        tr_timerFree( &m->bandwidthTimer );
491
492    if( m->rechokeTimer )
493        tr_timerFree( &m->rechokeTimer );
494
495    if( m->reconnectTimer )
496        tr_timerFree( &m->reconnectTimer );
497
498    if( m->refillUpkeepTimer )
499        tr_timerFree( &m->refillUpkeepTimer );
500}
501
502void
503tr_peerMgrFree( tr_peerMgr * manager )
504{
505    managerLock( manager );
506
507    deleteTimers( manager );
508
509    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
510     * the item from manager->handshakes, so this is a little roundabout... */
511    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
512        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
513
514    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
515
516    managerUnlock( manager );
517    tr_free( manager );
518}
519
520static int
521clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
522{
523    if( !tr_torrentHasMetadata( tor ) )
524        return TRUE;
525
526    return peer->clientIsInterested && !peer->clientIsChoked;
527}
528
529static int
530clientIsUploadingTo( const tr_peer * peer )
531{
532    return peer->peerIsInterested && !peer->peerIsChoked;
533}
534
535/***
536****
537***/
538
539tr_bool
540tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
541                      const tr_address  * addr )
542{
543    tr_bool isSeed = FALSE;
544    const Torrent * t = tor->torrentPeers;
545    const struct peer_atom * atom = getExistingAtom( t, addr );
546
547    if( atom )
548        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
549
550    return isSeed;
551}
552
553/**
554***  REQUESTS
555***
556*** There are two data structures associated with managing block requests:
557***
558*** 1. Torrent::requests, an array of "struct block_request" which keeps
559***    track of which blocks have been requested, and when, and by which peers.
560***    This is list is used for (a) cancelling requests that have been pending
561***    for too long and (b) avoiding duplicate requests before endgame.
562***
563*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
564***    pieces that we want to request.  It's used to decide which pieces to
565***    return next when tr_peerMgrGetBlockRequests() is called.
566**/
567
568/**
569*** struct block_request
570**/
571
572enum
573{
574    REQ_UNSORTED,
575    REQ_SORTED_BY_BLOCK,
576    REQ_SORTED_BY_TIME
577};
578
579static int
580compareReqByBlock( const void * va, const void * vb )
581{
582    const struct block_request * a = va;
583    const struct block_request * b = vb;
584    if( a->block < b->block ) return -1;
585    if( a->block > b->block ) return 1;
586    return 0;
587}
588
589static int
590compareReqByTime( const void * va, const void * vb )
591{
592    const struct block_request * a = va;
593    const struct block_request * b = vb;
594    if( a->sentAt < b->sentAt ) return -1;
595    if( a->sentAt > b->sentAt ) return 1;
596    return 0;
597}
598
599static void
600requestListSort( Torrent * t, int mode )
601{
602    assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME );
603
604    if( t->requestsSort != mode )
605    {
606        int(*compar)(const void *, const void *);
607
608        t->requestsSort = mode;
609
610        switch( mode ) {
611            case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break;
612            case REQ_SORTED_BY_TIME: compar = compareReqByTime; break;
613            default: assert( 0 && "unhandled" );
614        }
615
616        qsort( t->requests, t->requestCount,
617               sizeof( struct block_request ), compar );
618    }
619}
620
621static void
622requestListAdd( Torrent * t, const time_t now, tr_block_index_t block, tr_peer * peer )
623{
624    struct block_request key;
625
626    /* ensure enough room is available... */
627    if( t->requestCount + 1 >= t->requestAlloc )
628    {
629        const int CHUNK_SIZE = 128;
630        t->requestAlloc += CHUNK_SIZE;
631        t->requests = tr_renew( struct block_request,
632                                t->requests, t->requestAlloc );
633    }
634
635    /* populate the record we're inserting */
636    key.block = block;
637    key.peer = peer;
638    key.sentAt = now;
639
640    /* insert the request to our array... */
641    switch( t->requestsSort )
642    {
643        case REQ_UNSORTED:
644        case REQ_SORTED_BY_TIME:
645            t->requests[t->requestCount++] = key;
646            break;
647
648        case REQ_SORTED_BY_BLOCK: {
649            tr_bool exact;
650            const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
651                                           sizeof( struct block_request ),
652                                           compareReqByBlock, &exact );
653            assert( !exact );
654            memmove( t->requests + pos + 1,
655                     t->requests + pos,
656                     sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
657            t->requests[pos] = key;
658            break;
659        }
660    }
661}
662
663static struct block_request *
664requestListLookup( Torrent * t, tr_block_index_t block )
665{
666    struct block_request key;
667    key.block = block;
668
669    requestListSort( t, REQ_SORTED_BY_BLOCK );
670
671    return bsearch( &key, t->requests, t->requestCount,
672                    sizeof( struct block_request ),
673                    compareReqByBlock );
674}
675
676static void
677requestListRemove( Torrent * t, tr_block_index_t block )
678{
679    const struct block_request * b = requestListLookup( t, block );
680    if( b != NULL )
681    {
682        const int pos = b - t->requests;
683        assert( pos < t->requestCount );
684        memmove( t->requests + pos,
685                 t->requests + pos + 1,
686                 sizeof( struct block_request ) * ( --t->requestCount - pos ) );
687    }
688}
689
690/**
691*** struct weighted_piece
692**/
693
694enum
695{
696    PIECES_UNSORTED,
697    PIECES_SORTED_BY_INDEX,
698    PIECES_SORTED_BY_WEIGHT
699};
700
701const tr_torrent * weightTorrent;
702
703/* we try to create a "weight" s.t. high-priority pieces come before others,
704 * and that partially-complete pieces come before empty ones. */
705static int
706comparePieceByWeight( const void * va, const void * vb )
707{
708    const struct weighted_piece * a = va;
709    const struct weighted_piece * b = vb;
710    int ia, ib, missing, pending;
711    const tr_torrent * tor = weightTorrent;
712
713    /* primary key: weight */
714    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
715    pending = a->requestCount;
716    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
717    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
718    pending = b->requestCount;
719    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
720    if( ia < ib ) return -1;
721    if( ia > ib ) return 1;
722
723    /* secondary key: higher priorities go first */
724    ia = tor->info.pieces[a->index].priority;
725    ib = tor->info.pieces[b->index].priority;
726    if( ia > ib ) return -1;
727    if( ia < ib ) return 1;
728
729    /* tertiary key: random */
730    return a->salt - b->salt;
731}
732
733static int
734comparePieceByIndex( const void * va, const void * vb )
735{
736    const struct weighted_piece * a = va;
737    const struct weighted_piece * b = vb;
738    if( a->index < b->index ) return -1;
739    if( a->index > b->index ) return 1;
740    return 0;
741}
742
743static void
744pieceListSort( Torrent * t, int mode )
745{
746    int(*compar)(const void *, const void *);
747
748    assert( mode==PIECES_SORTED_BY_INDEX
749         || mode==PIECES_SORTED_BY_WEIGHT );
750
751    if( t->piecesSort != mode )
752    {
753        t->piecesSort = mode;
754
755        switch( mode ) {
756            case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
757            case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
758            default: assert( 0 && "unhandled" );  break;
759        }
760
761        weightTorrent = t->tor;
762        qsort( t->pieces, t->pieceCount,
763               sizeof( struct weighted_piece ), compar );
764    }
765
766    /* Also, as long as we've got the pieces sorted by weight,
767     * let's also update t.isInEndgame */
768    if( t->piecesSort == PIECES_SORTED_BY_WEIGHT )
769    {
770        tr_bool endgame = TRUE;
771
772        if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
773        {
774            const tr_completion * cp = &t->tor->completion;
775            const struct weighted_piece * p = t->pieces;
776            const int pending = p->requestCount;
777            const int missing = tr_cpMissingBlocksInPiece( cp, p->index );
778            endgame = pending >= missing;
779        }
780
781        t->isInEndgame = endgame;
782    }
783}
784
785static struct weighted_piece *
786pieceListLookup( Torrent * t, tr_piece_index_t index )
787{
788    struct weighted_piece key;
789    key.index = index;
790
791    pieceListSort( t, PIECES_SORTED_BY_INDEX );
792
793    return bsearch( &key, t->pieces, t->pieceCount,
794                    sizeof( struct weighted_piece ),
795                    comparePieceByIndex );
796}
797
798static void
799pieceListRebuild( Torrent * t )
800{
801    if( !tr_torrentIsSeed( t->tor ) )
802    {
803        tr_piece_index_t i;
804        tr_piece_index_t * pool;
805        tr_piece_index_t poolCount = 0;
806        const tr_torrent * tor = t->tor;
807        const tr_info * inf = tr_torrentInfo( tor );
808        struct weighted_piece * pieces;
809        int pieceCount;
810
811        /* build the new list */
812        pool = tr_new( tr_piece_index_t, inf->pieceCount );
813        for( i=0; i<inf->pieceCount; ++i )
814            if( !inf->pieces[i].dnd )
815                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
816                    pool[poolCount++] = i;
817        pieceCount = poolCount;
818        pieces = tr_new0( struct weighted_piece, pieceCount );
819        for( i=0; i<poolCount; ++i ) {
820            struct weighted_piece * piece = pieces + i;
821            piece->index = pool[i];
822            piece->requestCount = 0;
823            piece->salt = tr_cryptoWeakRandInt( 255 );
824        }
825
826        /* if we already had a list of pieces, merge it into
827         * the new list so we don't lose its requestCounts */
828        if( t->pieces != NULL )
829        {
830            struct weighted_piece * o = t->pieces;
831            struct weighted_piece * oend = o + t->pieceCount;
832            struct weighted_piece * n = pieces;
833            struct weighted_piece * nend = n + pieceCount;
834
835            pieceListSort( t, PIECES_SORTED_BY_INDEX );
836
837            while( o!=oend && n!=nend ) {
838                if( o->index < n->index )
839                    ++o;
840                else if( o->index > n->index )
841                    ++n;
842                else
843                    *n++ = *o++;
844            }
845
846            tr_free( t->pieces );
847        }
848
849        t->pieces = pieces;
850        t->pieceCount = pieceCount;
851        t->piecesSort = PIECES_SORTED_BY_INDEX;
852
853        /* cleanup */
854        tr_free( pool );
855    }
856}
857
858static void
859pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
860{
861    struct weighted_piece * p = pieceListLookup( t, piece );
862
863    if( p != NULL )
864    {
865        const int pos = p - t->pieces;
866
867        memmove( t->pieces + pos,
868                 t->pieces + pos + 1,
869                 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
870
871        if( t->pieceCount == 0 )
872        {
873            tr_free( t->pieces );
874            t->pieces = NULL;
875        }
876    }
877}
878
879static void
880pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
881{
882    struct weighted_piece * p;
883    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
884
885    if(( p = pieceListLookup( t, index )))
886        if( p->requestCount > 0 )
887            --p->requestCount;
888
889    /* note: this invalidates the weighted.piece.weight field,
890     * but that's OK since the call to pieceListLookup ensured
891     * that we were sorted by index anyway.. next time we resort
892     * by weight, pieceListSort() will update the weights */
893}
894
895/**
896***
897**/
898
899void
900tr_peerMgrRebuildRequests( tr_torrent * tor )
901{
902    assert( tr_isTorrent( tor ) );
903
904    pieceListRebuild( tor->torrentPeers );
905}
906
907void
908tr_peerMgrGetNextRequests( tr_torrent           * tor,
909                           tr_peer              * peer,
910                           int                    numwant,
911                           tr_block_index_t     * setme,
912                           int                  * numgot )
913{
914    int i;
915    int got;
916    Torrent * t;
917    struct weighted_piece * pieces;
918    const tr_bitset * have = &peer->have;
919    const time_t now = time( NULL );
920
921    /* sanity clause */
922    assert( tr_isTorrent( tor ) );
923    assert( numwant > 0 );
924
925    /* walk through the pieces and find blocks that should be requested */
926    got = 0;
927    t = tor->torrentPeers;
928
929    /* prep the pieces list */
930    if( t->pieces == NULL )
931        pieceListRebuild( t );
932    pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
933
934    pieces = t->pieces;
935    for( i=0; i<t->pieceCount && got<numwant; ++i )
936    {
937        struct weighted_piece * p = pieces + i;
938
939        /* if the peer has this piece that we want... */
940        if( tr_bitsetHasFast( have, p->index ) )
941        {
942            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
943            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
944
945            for( ; b!=e && got<numwant; ++b )
946            {
947                struct block_request * breq;
948
949                /* don't request blocks we've already got */
950                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
951                    continue;
952
953                /* don't request blocks we've already requested (FIXME) */
954                breq = requestListLookup( t, b );
955                if( breq != NULL ) {
956                    assert( breq->peer != NULL );
957                    if( breq->peer == peer ) continue;
958                    if( !t->isInEndgame ) continue;
959                }
960
961                setme[got++] = b;
962
963                /* update our own tables */
964                if( breq == NULL )
965                    requestListAdd( t, now, b, peer );
966                ++p->requestCount;
967            }
968        }
969    }
970
971    /* We almost always change only a handful of pieces in the array.
972     * In these cases, it's cheaper to sort those changed pieces and merge,
973     * than qsort()ing the whole array again */
974    if( got > 0 )
975    {
976        struct weighted_piece * p;
977        struct weighted_piece * pieces;
978        struct weighted_piece * a = t->pieces;
979        struct weighted_piece * a_end = t->pieces + i;
980        struct weighted_piece * b = a_end;
981        struct weighted_piece * b_end = t->pieces + t->pieceCount;
982
983        /* rescore the pieces that we changed */
984        weightTorrent = t->tor;
985        qsort( a, a_end-a, sizeof( struct weighted_piece ), comparePieceByWeight );
986
987        /* allocate a new array */
988        p = pieces = tr_new( struct weighted_piece, t->pieceCount );
989
990        /* merge the two sorted arrays into this new array */
991        weightTorrent = t->tor;
992        while( a!=a_end && b!=b_end )
993            *p++ = comparePieceByWeight( a, b ) < 0 ? *a++ : *b++;
994        while( a!=a_end ) *p++ = *a++;
995        while( b!=b_end ) *p++ = *b++;
996
997#if 0
998        /* make sure we did it right */
999        assert( p - pieces == t->pieceCount );
1000        for( it=pieces; it+1<p; ++it )
1001            assert( it->weight <= it[1].weight );
1002#endif
1003
1004        /* update */
1005        tr_free( t->pieces );
1006        t->pieces = pieces;
1007    }
1008
1009    *numgot = got;
1010}
1011
1012tr_bool
1013tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1014                          const tr_peer     * peer,
1015                          tr_block_index_t    block )
1016{
1017    const Torrent * t = tor->torrentPeers;
1018    const struct block_request * b = requestListLookup( (Torrent*)t, block );
1019    if( b == NULL ) return FALSE;
1020    if( b->peer == peer ) return TRUE;
1021    if( t->isInEndgame ) return TRUE;
1022    return FALSE;
1023}
1024
1025/* cancel requests that are too old */
1026static int
1027refillUpkeep( void * vmgr )
1028{
1029    time_t now;
1030    time_t too_old;
1031    tr_torrent * tor;
1032    tr_peerMgr * mgr = vmgr;
1033    managerLock( mgr );
1034
1035    now = time( NULL );
1036    too_old = now - REQUEST_TTL_SECS;
1037
1038    tor = NULL;
1039    while(( tor = tr_torrentNext( mgr->session, tor )))
1040    {
1041        Torrent * t = tor->torrentPeers;
1042        const int n = t->requestCount;
1043        if( n > 0 )
1044        {
1045            int keepCount = 0;
1046            int cancelCount = 0;
1047            struct block_request * keep = tr_new( struct block_request, n );
1048            struct block_request * cancel = tr_new( struct block_request, n );
1049            const struct block_request * it;
1050            const struct block_request * end;
1051
1052            for( it=t->requests, end=it+n; it!=end; ++it )
1053                if( it->sentAt <= too_old )
1054                    cancel[cancelCount++] = *it;
1055                else
1056                    keep[keepCount++] = *it;
1057
1058            /* prune out the ones we aren't keeping */
1059            tr_free( t->requests );
1060            t->requests = keep;
1061            t->requestCount = keepCount;
1062            t->requestAlloc = n;
1063
1064            /* send cancel messages for all the "cancel" ones */
1065            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1066                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) )
1067                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1068
1069            /* decrement the pending request counts for the timed-out blocks */
1070            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1071                pieceListRemoveRequest( t, it->block );
1072
1073            /* cleanup loop */
1074            tr_free( cancel );
1075        }
1076    }
1077
1078    managerUnlock( mgr );
1079    return TRUE;
1080}
1081
1082static void
1083addStrike( Torrent * t, tr_peer * peer )
1084{
1085    tordbg( t, "increasing peer %s strike count to %d",
1086            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1087
1088    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1089    {
1090        struct peer_atom * atom = peer->atom;
1091        atom->myflags |= MYFLAG_BANNED;
1092        peer->doPurge = 1;
1093        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1094    }
1095}
1096
1097static void
1098gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1099{
1100    tr_torrent *   tor = t->tor;
1101    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1102
1103    tor->corruptCur += byteCount;
1104    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1105}
1106
1107static void
1108peerSuggestedPiece( Torrent            * t UNUSED,
1109                    tr_peer            * peer UNUSED,
1110                    tr_piece_index_t     pieceIndex UNUSED,
1111                    int                  isFastAllowed UNUSED )
1112{
1113#if 0
1114    assert( t );
1115    assert( peer );
1116    assert( peer->msgs );
1117
1118    /* is this a valid piece? */
1119    if(  pieceIndex >= t->tor->info.pieceCount )
1120        return;
1121
1122    /* don't ask for it if we've already got it */
1123    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1124        return;
1125
1126    /* don't ask for it if they don't have it */
1127    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1128        return;
1129
1130    /* don't ask for it if we're choked and it's not fast */
1131    if( !isFastAllowed && peer->clientIsChoked )
1132        return;
1133
1134    /* request the blocks that we don't have in this piece */
1135    {
1136        tr_block_index_t block;
1137        const tr_torrent * tor = t->tor;
1138        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1139        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1140
1141        for( block=start; block<end; ++block )
1142        {
1143            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1144            {
1145                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1146                const uint32_t length = tr_torBlockCountBytes( tor, block );
1147                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1148                incrementPieceRequests( t, pieceIndex );
1149            }
1150        }
1151    }
1152#endif
1153}
1154
1155static void
1156decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount )
1157{
1158    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1159}
1160
1161static void
1162clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block )
1163{
1164    decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) );
1165}
1166
1167static void
1168removeRequestFromTables( Torrent * t, tr_block_index_t block )
1169{
1170    requestListRemove( t, block );
1171    pieceListRemoveRequest( t, block );
1172}
1173
1174/* peer choked us, or maybe it disconnected.
1175   either way we need to remove all its requests */
1176static void
1177peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1178{
1179    int i, n;
1180    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1181
1182    for( i=n=0; i<t->requestCount; ++i )
1183        if( peer == t->requests[i].peer )
1184            blocks[n++] = t->requests[i].block;
1185
1186    for( i=0; i<n; ++i )
1187        removeRequestFromTables( t, blocks[i] );
1188
1189    tr_free( blocks );
1190}
1191
1192static void
1193peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1194{
1195    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1196    Torrent * t = vt;
1197    const tr_peer_event * e = vevent;
1198
1199    torrentLock( t );
1200
1201    switch( e->eventType )
1202    {
1203        case TR_PEER_UPLOAD_ONLY:
1204            /* update our atom */
1205            if( peer ) {
1206                if( e->uploadOnly ) {
1207                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1208                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1209                } else {
1210                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1211                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1212                }
1213            }
1214            break;
1215
1216        case TR_PEER_PEER_GOT_DATA:
1217        {
1218            const time_t now = time( NULL );
1219            tr_torrent * tor = t->tor;
1220
1221            tr_torrentSetActivityDate( tor, now );
1222
1223            if( e->wasPieceData ) {
1224                tor->uploadedCur += e->length;
1225                tr_torrentSetDirty( tor );
1226            }
1227
1228            /* update the stats */
1229            if( e->wasPieceData )
1230                tr_statsAddUploaded( tor->session, e->length );
1231
1232            /* update our atom */
1233            if( peer && e->wasPieceData )
1234                peer->atom->piece_data_time = now;
1235
1236            tor->needsSeedRatioCheck = TRUE;
1237
1238            break;
1239        }
1240
1241        case TR_PEER_CLIENT_GOT_REJ:
1242            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ) );
1243            break;
1244
1245        case TR_PEER_CLIENT_GOT_CHOKE:
1246            peerDeclinedAllRequests( t, peer );
1247            break;
1248
1249        case TR_PEER_CLIENT_GOT_PORT:
1250            if( peer )
1251                peer->atom->port = e->port;
1252            break;
1253
1254        case TR_PEER_CLIENT_GOT_SUGGEST:
1255            if( peer )
1256                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1257            break;
1258
1259        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1260            if( peer )
1261                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1262            break;
1263
1264        case TR_PEER_CLIENT_GOT_DATA:
1265        {
1266            const time_t now = time( NULL );
1267            tr_torrent * tor = t->tor;
1268
1269            tr_torrentSetActivityDate( tor, now );
1270
1271            /* only add this to downloadedCur if we got it from a peer --
1272             * webseeds shouldn't count against our ratio.  As one tracker
1273             * admin put it, "Those pieces are downloaded directly from the
1274             * content distributor, not the peers, it is the tracker's job
1275             * to manage the swarms, not the web server and does not fit
1276             * into the jurisdiction of the tracker." */
1277            if( peer && e->wasPieceData ) {
1278                tor->downloadedCur += e->length;
1279                tr_torrentSetDirty( tor );
1280            }
1281
1282            /* update the stats */
1283            if( e->wasPieceData )
1284                tr_statsAddDownloaded( tor->session, e->length );
1285
1286            /* update our atom */
1287            if( peer && e->wasPieceData )
1288                peer->atom->piece_data_time = now;
1289
1290            break;
1291        }
1292
1293        case TR_PEER_PEER_PROGRESS:
1294        {
1295            if( peer )
1296            {
1297                struct peer_atom * atom = peer->atom;
1298                if( e->progress >= 1.0 ) {
1299                    tordbg( t, "marking peer %s as a seed",
1300                            tr_atomAddrStr( atom ) );
1301                    atom->flags |= ADDED_F_SEED_FLAG;
1302                }
1303            }
1304            break;
1305        }
1306
1307        case TR_PEER_CLIENT_GOT_BLOCK:
1308        {
1309            tr_torrent * tor = t->tor;
1310            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1311
1312            requestListRemove( t, block );
1313            pieceListRemoveRequest( t, block );
1314
1315            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1316            {
1317                tordbg( t, "we have this block already..." );
1318                clientGotUnwantedBlock( tor, block );
1319            }
1320            else
1321            {
1322                tr_cpBlockAdd( &tor->completion, block );
1323                tr_torrentSetDirty( tor );
1324
1325                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1326                {
1327                    const tr_piece_index_t p = e->pieceIndex;
1328                    const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1329
1330                    if( !ok )
1331                    {
1332                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1333                                   (unsigned long)p );
1334                    }
1335
1336                    tr_torrentSetHasPiece( tor, p, ok );
1337                    tr_torrentSetPieceChecked( tor, p, TRUE );
1338                    tr_peerMgrSetBlame( tor, p, ok );
1339
1340                    if( !ok )
1341                    {
1342                        gotBadPiece( t, p );
1343                    }
1344                    else
1345                    {
1346                        int i;
1347                        int peerCount;
1348                        tr_peer ** peers;
1349                        tr_file_index_t fileIndex;
1350
1351                        peerCount = tr_ptrArraySize( &t->peers );
1352                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1353                        for( i=0; i<peerCount; ++i )
1354                            tr_peerMsgsHave( peers[i]->msgs, p );
1355
1356                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1357                            const tr_file * file = &tor->info.files[fileIndex];
1358                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1359                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1360                                    tr_torrentFileCompleted( tor, fileIndex );
1361                        }
1362
1363                        pieceListRemovePiece( t, p );
1364                    }
1365                }
1366
1367                t->needsCompletenessCheck = TRUE;
1368            }
1369            break;
1370        }
1371
1372        case TR_PEER_ERROR:
1373            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1374            {
1375                /* some protocol error from the peer */
1376                peer->doPurge = 1;
1377                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1378                        tr_atomAddrStr( peer->atom ) );
1379            }
1380            else
1381            {
1382                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1383            }
1384            break;
1385
1386        default:
1387            assert( 0 );
1388    }
1389
1390    torrentUnlock( t );
1391}
1392
1393static int
1394getDefaultShelfLife( uint8_t from )
1395{
1396    /* in general, peers obtained from firsthand contact
1397     * are better than those from secondhand, etc etc */
1398    switch( from )
1399    {
1400        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1401        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1402        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1403        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1404        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1405        case TR_PEER_FROM_RESUME   : return 60 * 60;
1406        default                    : return 60 * 60;
1407    }
1408}
1409
1410
1411static void
1412ensureAtomExists( Torrent           * t,
1413                  const time_t        now,
1414                  const tr_address  * addr,
1415                  const tr_port       port,
1416                  const uint8_t       flags,
1417                  const uint8_t       from )
1418{
1419    assert( tr_isAddress( addr ) );
1420    assert( from < TR_PEER_FROM__MAX );
1421
1422    if( getExistingAtom( t, addr ) == NULL )
1423    {
1424        struct peer_atom * a;
1425        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1426
1427        a = tr_new0( struct peer_atom, 1 );
1428        a->addr = *addr;
1429        a->port = port;
1430        a->flags = flags;
1431        a->from = from;
1432        a->shelf_date = now + getDefaultShelfLife( from ) + jitter;
1433        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1434
1435        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1436    }
1437}
1438
1439static int
1440getMaxPeerCount( const tr_torrent * tor )
1441{
1442    return tor->maxConnectedPeers;
1443}
1444
1445static int
1446getPeerCount( const Torrent * t )
1447{
1448    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1449}
1450
1451/* FIXME: this is kind of a mess. */
1452static tr_bool
1453myHandshakeDoneCB( tr_handshake  * handshake,
1454                   tr_peerIo     * io,
1455                   tr_bool         isConnected,
1456                   const uint8_t * peer_id,
1457                   void          * vmanager )
1458{
1459    tr_bool            ok = isConnected;
1460    tr_bool            success = FALSE;
1461    tr_port            port;
1462    const tr_address * addr;
1463    tr_peerMgr       * manager = vmanager;
1464    Torrent          * t;
1465    tr_handshake     * ours;
1466
1467    assert( io );
1468    assert( tr_isBool( ok ) );
1469
1470    t = tr_peerIoHasTorrentHash( io )
1471        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1472        : NULL;
1473
1474    if( tr_peerIoIsIncoming ( io ) )
1475        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1476                                        handshake, handshakeCompare );
1477    else if( t )
1478        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1479                                        handshake, handshakeCompare );
1480    else
1481        ours = handshake;
1482
1483    assert( ours );
1484    assert( ours == handshake );
1485
1486    if( t )
1487        torrentLock( t );
1488
1489    addr = tr_peerIoGetAddress( io, &port );
1490
1491    if( !ok || !t || !t->isRunning )
1492    {
1493        if( t )
1494        {
1495            struct peer_atom * atom = getExistingAtom( t, addr );
1496            if( atom )
1497                ++atom->numFails;
1498        }
1499    }
1500    else /* looking good */
1501    {
1502        struct peer_atom * atom;
1503        const time_t now = time( NULL );
1504
1505        ensureAtomExists( t, now, addr, port, 0, TR_PEER_FROM_INCOMING );
1506        atom = getExistingAtom( t, addr );
1507        atom->time = now;
1508        atom->piece_data_time = 0;
1509
1510        if( atom->myflags & MYFLAG_BANNED )
1511        {
1512            tordbg( t, "banned peer %s tried to reconnect",
1513                    tr_atomAddrStr( atom ) );
1514        }
1515        else if( tr_peerIoIsIncoming( io )
1516               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1517
1518        {
1519        }
1520        else
1521        {
1522            tr_peer * peer = getExistingPeer( t, addr );
1523
1524            if( peer ) /* we already have this peer */
1525            {
1526            }
1527            else
1528            {
1529                peer = getPeer( t, atom );
1530                tr_free( peer->client );
1531
1532                if( !peer_id )
1533                    peer->client = NULL;
1534                else {
1535                    char client[128];
1536                    tr_clientForId( client, sizeof( client ), peer_id );
1537                    peer->client = tr_strdup( client );
1538                }
1539
1540                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1541                                                                balanced by our unref in peerDestructor()  */
1542                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1543                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1544
1545                success = TRUE;
1546            }
1547        }
1548    }
1549
1550    if( t )
1551        torrentUnlock( t );
1552
1553    return success;
1554}
1555
1556void
1557tr_peerMgrAddIncoming( tr_peerMgr * manager,
1558                       tr_address * addr,
1559                       tr_port      port,
1560                       int          socket )
1561{
1562    tr_session * session;
1563
1564    managerLock( manager );
1565
1566    assert( tr_isSession( manager->session ) );
1567    session = manager->session;
1568
1569    if( tr_sessionIsAddressBlocked( session, addr ) )
1570    {
1571        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1572        tr_netClose( session, socket );
1573    }
1574    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1575    {
1576        tr_netClose( session, socket );
1577    }
1578    else /* we don't have a connetion to them yet... */
1579    {
1580        tr_peerIo *    io;
1581        tr_handshake * handshake;
1582
1583        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1584
1585        handshake = tr_handshakeNew( io,
1586                                     session->encryptionMode,
1587                                     myHandshakeDoneCB,
1588                                     manager );
1589
1590        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1591
1592        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1593                                 handshakeCompare );
1594    }
1595
1596    managerUnlock( manager );
1597}
1598
1599static tr_bool
1600tr_isPex( const tr_pex * pex )
1601{
1602    return pex && tr_isAddress( &pex->addr );
1603}
1604
1605void
1606tr_peerMgrAddPex( tr_torrent   *  tor,
1607                  uint8_t         from,
1608                  const tr_pex *  pex )
1609{
1610    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1611    {
1612        Torrent * t = tor->torrentPeers;
1613        managerLock( t->manager );
1614
1615        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1616            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1617                ensureAtomExists( t, time( NULL ), &pex->addr, pex->port, pex->flags, from );
1618
1619        managerUnlock( t->manager );
1620    }
1621}
1622
1623tr_pex *
1624tr_peerMgrCompactToPex( const void *    compact,
1625                        size_t          compactLen,
1626                        const uint8_t * added_f,
1627                        size_t          added_f_len,
1628                        size_t *        pexCount )
1629{
1630    size_t          i;
1631    size_t          n = compactLen / 6;
1632    const uint8_t * walk = compact;
1633    tr_pex *        pex = tr_new0( tr_pex, n );
1634
1635    for( i = 0; i < n; ++i )
1636    {
1637        pex[i].addr.type = TR_AF_INET;
1638        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1639        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1640        if( added_f && ( n == added_f_len ) )
1641            pex[i].flags = added_f[i];
1642    }
1643
1644    *pexCount = n;
1645    return pex;
1646}
1647
1648tr_pex *
1649tr_peerMgrCompact6ToPex( const void    * compact,
1650                         size_t          compactLen,
1651                         const uint8_t * added_f,
1652                         size_t          added_f_len,
1653                         size_t        * pexCount )
1654{
1655    size_t          i;
1656    size_t          n = compactLen / 18;
1657    const uint8_t * walk = compact;
1658    tr_pex *        pex = tr_new0( tr_pex, n );
1659
1660    for( i = 0; i < n; ++i )
1661    {
1662        pex[i].addr.type = TR_AF_INET6;
1663        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1664        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1665        if( added_f && ( n == added_f_len ) )
1666            pex[i].flags = added_f[i];
1667    }
1668
1669    *pexCount = n;
1670    return pex;
1671}
1672
1673tr_pex *
1674tr_peerMgrArrayToPex( const void * array,
1675                      size_t       arrayLen,
1676                      size_t      * pexCount )
1677{
1678    size_t          i;
1679    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1680    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1681    const uint8_t * walk = array;
1682    tr_pex        * pex = tr_new0( tr_pex, n );
1683
1684    for( i = 0 ; i < n ; i++ ) {
1685        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1686        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1687        pex[i].flags = 0x00;
1688        walk += sizeof( tr_address ) + 2;
1689    }
1690
1691    *pexCount = n;
1692    return pex;
1693}
1694
1695/**
1696***
1697**/
1698
1699void
1700tr_peerMgrSetBlame( tr_torrent     * tor,
1701                    tr_piece_index_t pieceIndex,
1702                    int              success )
1703{
1704    if( !success )
1705    {
1706        int        peerCount, i;
1707        Torrent *  t = tor->torrentPeers;
1708        tr_peer ** peers;
1709
1710        assert( torrentIsLocked( t ) );
1711
1712        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1713        for( i = 0; i < peerCount; ++i )
1714        {
1715            tr_peer * peer = peers[i];
1716            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1717            {
1718                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1719                        tr_atomAddrStr( peer->atom ),
1720                        pieceIndex, (int)peer->strikes + 1 );
1721                addStrike( t, peer );
1722            }
1723        }
1724    }
1725}
1726
1727int
1728tr_pexCompare( const void * va, const void * vb )
1729{
1730    const tr_pex * a = va;
1731    const tr_pex * b = vb;
1732    int i;
1733
1734    assert( tr_isPex( a ) );
1735    assert( tr_isPex( b ) );
1736
1737    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1738        return i;
1739
1740    if( a->port != b->port )
1741        return a->port < b->port ? -1 : 1;
1742
1743    return 0;
1744}
1745
1746#if 0
1747static int
1748peerPrefersCrypto( const tr_peer * peer )
1749{
1750    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1751        return TRUE;
1752
1753    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1754        return FALSE;
1755
1756    return tr_peerIoIsEncrypted( peer->io );
1757}
1758#endif
1759
1760/* better goes first */
1761static int
1762compareAtomsByUsefulness( const void * va, const void *vb )
1763{
1764    const struct peer_atom * a = * (const struct peer_atom**) va;
1765    const struct peer_atom * b = * (const struct peer_atom**) vb;
1766
1767    assert( tr_isAtom( a ) );
1768    assert( tr_isAtom( b ) );
1769
1770    if( a->piece_data_time != b->piece_data_time )
1771        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1772    if( a->from != b->from )
1773        return a->from < b->from ? -1 : 1;
1774    if( a->numFails != b->numFails )
1775        return a->numFails < b->numFails ? -1 : 1;
1776
1777    return 0;
1778}
1779
1780int
1781tr_peerMgrGetPeers( tr_torrent   * tor,
1782                    tr_pex      ** setme_pex,
1783                    uint8_t        af,
1784                    uint8_t        list_mode,
1785                    int            maxCount )
1786{
1787    int i;
1788    int n;
1789    int count = 0;
1790    int atomCount = 0;
1791    const Torrent * t = tor->torrentPeers;
1792    struct peer_atom ** atoms = NULL;
1793    tr_pex * pex;
1794    tr_pex * walk;
1795
1796    assert( tr_isTorrent( tor ) );
1797    assert( setme_pex != NULL );
1798    assert( af==TR_AF_INET || af==TR_AF_INET6 );
1799    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1800
1801    managerLock( t->manager );
1802
1803    /**
1804    ***  build a list of atoms
1805    **/
1806
1807    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1808    {
1809        int i;
1810        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1811        atomCount = tr_ptrArraySize( &t->peers );
1812        atoms = tr_new( struct peer_atom *, atomCount );
1813        for( i=0; i<atomCount; ++i )
1814            atoms[i] = peers[i]->atom;
1815    }
1816    else /* TR_PEERS_ALL */
1817    {
1818        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1819        atomCount = tr_ptrArraySize( &t->pool );
1820        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1821    }
1822
1823    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1824
1825    /**
1826    ***  add the first N of them into our return list
1827    **/
1828
1829    n = MIN( atomCount, maxCount );
1830    pex = walk = tr_new0( tr_pex, n );
1831
1832    for( i=0; i<atomCount && count<n; ++i )
1833    {
1834        const struct peer_atom * atom = atoms[i];
1835        if( atom->addr.type == af )
1836        {
1837            assert( tr_isAddress( &atom->addr ) );
1838            walk->addr = atom->addr;
1839            walk->port = atom->port;
1840            walk->flags = atom->flags;
1841            ++count;
1842            ++walk;
1843        }
1844    }
1845
1846    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1847
1848    assert( ( walk - pex ) == count );
1849    *setme_pex = pex;
1850
1851    /* cleanup */
1852    tr_free( atoms );
1853    managerUnlock( t->manager );
1854    return count;
1855}
1856
1857static int atomPulse      ( void * vmgr );
1858static int bandwidthPulse ( void * vmgr );
1859static int rechokePulse   ( void * vmgr );
1860static int reconnectPulse ( void * vmgr );
1861
1862static void
1863ensureMgrTimersExist( struct tr_peerMgr * m )
1864{
1865    tr_session * s = m->session;
1866
1867    if( m->atomTimer == NULL )
1868        m->atomTimer = tr_timerNew( s, atomPulse, m, ATOM_PERIOD_MSEC );
1869
1870    if( m->bandwidthTimer == NULL )
1871        m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
1872
1873    if( m->rechokeTimer == NULL )
1874        m->rechokeTimer = tr_timerNew( s, rechokePulse, m, RECHOKE_PERIOD_MSEC );
1875
1876    if( m->reconnectTimer == NULL )
1877        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
1878
1879    if( m->refillUpkeepTimer == NULL )
1880        m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
1881}
1882
1883void
1884tr_peerMgrStartTorrent( tr_torrent * tor )
1885{
1886    Torrent * t = tor->torrentPeers;
1887
1888    assert( t != NULL );
1889    managerLock( t->manager );
1890    ensureMgrTimersExist( t->manager );
1891
1892    t->isRunning = TRUE;
1893
1894    rechokePulse( t->manager );
1895    managerUnlock( t->manager );
1896}
1897
1898static void
1899stopTorrent( Torrent * t )
1900{
1901    int i, n;
1902
1903    assert( torrentIsLocked( t ) );
1904
1905    t->isRunning = FALSE;
1906
1907    /* disconnect the peers. */
1908    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
1909        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
1910    tr_ptrArrayClear( &t->peers );
1911
1912    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1913     * which removes the handshake from t->outgoingHandshakes... */
1914    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1915        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1916}
1917
1918void
1919tr_peerMgrStopTorrent( tr_torrent * tor )
1920{
1921    Torrent * t = tor->torrentPeers;
1922
1923    managerLock( t->manager );
1924
1925    stopTorrent( t );
1926
1927    managerUnlock( t->manager );
1928}
1929
1930void
1931tr_peerMgrAddTorrent( tr_peerMgr * manager,
1932                      tr_torrent * tor )
1933{
1934    managerLock( manager );
1935
1936    assert( tor );
1937    assert( tor->torrentPeers == NULL );
1938
1939    tor->torrentPeers = torrentConstructor( manager, tor );
1940
1941    managerUnlock( manager );
1942}
1943
1944void
1945tr_peerMgrRemoveTorrent( tr_torrent * tor )
1946{
1947    tr_torrentLock( tor );
1948
1949    stopTorrent( tor->torrentPeers );
1950    torrentDestructor( tor->torrentPeers );
1951
1952    tr_torrentUnlock( tor );
1953}
1954
1955void
1956tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1957                               int8_t           * tab,
1958                               unsigned int       tabCount )
1959{
1960    tr_piece_index_t   i;
1961    const Torrent *    t;
1962    float              interval;
1963    tr_bool            isSeed;
1964    int                peerCount;
1965    const tr_peer **   peers;
1966    tr_torrentLock( tor );
1967
1968    t = tor->torrentPeers;
1969    tor = t->tor;
1970    interval = tor->info.pieceCount / (float)tabCount;
1971    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1972    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1973    peerCount = tr_ptrArraySize( &t->peers );
1974
1975    memset( tab, 0, tabCount );
1976
1977    for( i = 0; tor && i < tabCount; ++i )
1978    {
1979        const int piece = i * interval;
1980
1981        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1982            tab[i] = -1;
1983        else if( peerCount ) {
1984            int j;
1985            for( j = 0; j < peerCount; ++j )
1986                if( tr_bitsetHas( &peers[j]->have, i ) )
1987                    ++tab[i];
1988        }
1989    }
1990
1991    tr_torrentUnlock( tor );
1992}
1993
1994/* Returns the pieces that are available from peers */
1995tr_bitfield*
1996tr_peerMgrGetAvailable( const tr_torrent * tor )
1997{
1998    int i;
1999    int peerCount;
2000    Torrent * t = tor->torrentPeers;
2001    const tr_peer ** peers;
2002    tr_bitfield * pieces;
2003    managerLock( t->manager );
2004
2005    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2006    peerCount = tr_ptrArraySize( &t->peers );
2007    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2008    for( i=0; i<peerCount; ++i )
2009        tr_bitsetOr( pieces, &peers[i]->have );
2010
2011    managerUnlock( t->manager );
2012    return pieces;
2013}
2014
2015void
2016tr_peerMgrTorrentStats( tr_torrent       * tor,
2017                        int              * setmePeersKnown,
2018                        int              * setmePeersConnected,
2019                        int              * setmeSeedsConnected,
2020                        int              * setmeWebseedsSendingToUs,
2021                        int              * setmePeersSendingToUs,
2022                        int              * setmePeersGettingFromUs,
2023                        int              * setmePeersFrom )
2024{
2025    int i, size;
2026    const Torrent * t = tor->torrentPeers;
2027    const tr_peer ** peers;
2028    const tr_webseed ** webseeds;
2029
2030    managerLock( t->manager );
2031
2032    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2033    size = tr_ptrArraySize( &t->peers );
2034
2035    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2036    *setmePeersConnected       = 0;
2037    *setmeSeedsConnected       = 0;
2038    *setmePeersGettingFromUs   = 0;
2039    *setmePeersSendingToUs     = 0;
2040    *setmeWebseedsSendingToUs  = 0;
2041
2042    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2043        setmePeersFrom[i] = 0;
2044
2045    for( i=0; i<size; ++i )
2046    {
2047        const tr_peer * peer = peers[i];
2048        const struct peer_atom * atom = peer->atom;
2049
2050        if( peer->io == NULL ) /* not connected */
2051            continue;
2052
2053        ++*setmePeersConnected;
2054
2055        ++setmePeersFrom[atom->from];
2056
2057        if( clientIsDownloadingFrom( tor, peer ) )
2058            ++*setmePeersSendingToUs;
2059
2060        if( clientIsUploadingTo( peer ) )
2061            ++*setmePeersGettingFromUs;
2062
2063        if( atom->flags & ADDED_F_SEED_FLAG )
2064            ++*setmeSeedsConnected;
2065    }
2066
2067    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2068    size = tr_ptrArraySize( &t->webseeds );
2069    for( i=0; i<size; ++i )
2070        if( tr_webseedIsActive( webseeds[i] ) )
2071            ++*setmeWebseedsSendingToUs;
2072
2073    managerUnlock( t->manager );
2074}
2075
2076float
2077tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
2078{
2079    int i;
2080    float tmp;
2081    float ret = 0;
2082
2083    const Torrent * t = tor->torrentPeers;
2084    const int n = tr_ptrArraySize( &t->webseeds );
2085    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2086
2087    for( i=0; i<n; ++i )
2088        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
2089            ret += tmp;
2090
2091    return ret;
2092}
2093
2094
2095float*
2096tr_peerMgrWebSpeeds( const tr_torrent * tor )
2097{
2098    const Torrent * t = tor->torrentPeers;
2099    const tr_webseed ** webseeds;
2100    int i;
2101    int webseedCount;
2102    float * ret;
2103    uint64_t now;
2104
2105    assert( t->manager );
2106    managerLock( t->manager );
2107
2108    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2109    webseedCount = tr_ptrArraySize( &t->webseeds );
2110    assert( webseedCount == tor->info.webseedCount );
2111    ret = tr_new0( float, webseedCount );
2112    now = tr_date( );
2113
2114    for( i=0; i<webseedCount; ++i )
2115        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
2116            ret[i] = -1.0;
2117
2118    managerUnlock( t->manager );
2119    return ret;
2120}
2121
2122double
2123tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
2124{
2125    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
2126}
2127
2128
2129struct tr_peer_stat *
2130tr_peerMgrPeerStats( const tr_torrent    * tor,
2131                     int                 * setmeCount )
2132{
2133    int i, size;
2134    const Torrent * t = tor->torrentPeers;
2135    const tr_peer ** peers;
2136    tr_peer_stat * ret;
2137    uint64_t now;
2138
2139    assert( t->manager );
2140    managerLock( t->manager );
2141
2142    size = tr_ptrArraySize( &t->peers );
2143    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2144    ret = tr_new0( tr_peer_stat, size );
2145    now = tr_date( );
2146
2147    for( i=0; i<size; ++i )
2148    {
2149        char *                   pch;
2150        const tr_peer *          peer = peers[i];
2151        const struct peer_atom * atom = peer->atom;
2152        tr_peer_stat *           stat = ret + i;
2153
2154        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2155        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2156                   sizeof( stat->client ) );
2157        stat->port               = ntohs( peer->atom->port );
2158        stat->from               = atom->from;
2159        stat->progress           = peer->progress;
2160        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2161        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
2162        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
2163        stat->peerIsChoked       = peer->peerIsChoked;
2164        stat->peerIsInterested   = peer->peerIsInterested;
2165        stat->clientIsChoked     = peer->clientIsChoked;
2166        stat->clientIsInterested = peer->clientIsInterested;
2167        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
2168        stat->isDownloadingFrom  = clientIsDownloadingFrom( tor, peer );
2169        stat->isUploadingTo      = clientIsUploadingTo( peer );
2170        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2171
2172        pch = stat->flagStr;
2173        if( t->optimistic == peer ) *pch++ = 'O';
2174        if( stat->isDownloadingFrom ) *pch++ = 'D';
2175        else if( stat->clientIsInterested ) *pch++ = 'd';
2176        if( stat->isUploadingTo ) *pch++ = 'U';
2177        else if( stat->peerIsInterested ) *pch++ = 'u';
2178        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2179        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2180        if( stat->isEncrypted ) *pch++ = 'E';
2181        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2182        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2183        if( stat->isIncoming ) *pch++ = 'I';
2184        *pch = '\0';
2185    }
2186
2187    *setmeCount = size;
2188
2189    managerUnlock( t->manager );
2190    return ret;
2191}
2192
2193/**
2194***
2195**/
2196
2197struct ChokeData
2198{
2199    tr_bool         doUnchoke;
2200    tr_bool         isInterested;
2201    tr_bool         isChoked;
2202    int             rate;
2203    tr_peer *       peer;
2204};
2205
2206static int
2207compareChoke( const void * va,
2208              const void * vb )
2209{
2210    const struct ChokeData * a = va;
2211    const struct ChokeData * b = vb;
2212
2213    if( a->rate != b->rate ) /* prefer higher overall speeds */
2214        return a->rate > b->rate ? -1 : 1;
2215
2216    if( a->isChoked != b->isChoked ) /* prefer unchoked */
2217        return a->isChoked ? 1 : -1;
2218
2219    return 0;
2220}
2221
2222static int
2223isNew( const tr_peer * peer )
2224{
2225    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2226}
2227
2228static int
2229isSame( const tr_peer * peer )
2230{
2231    return peer && peer->client && strstr( peer->client, "Transmission" );
2232}
2233
2234/**
2235***
2236**/
2237
2238static void
2239rechokeTorrent( Torrent * t, const uint64_t now )
2240{
2241    int i, size, unchokedInterested;
2242    const int peerCount = tr_ptrArraySize( &t->peers );
2243    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2244    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2245    const tr_session * session = t->manager->session;
2246    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2247
2248    assert( torrentIsLocked( t ) );
2249
2250    /* sort the peers by preference and rate */
2251    for( i = 0, size = 0; i < peerCount; ++i )
2252    {
2253        tr_peer * peer = peers[i];
2254        struct peer_atom * atom = peer->atom;
2255
2256        if( peer->progress >= 1.0 ) /* choke all seeds */
2257        {
2258            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2259        }
2260        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2261        {
2262            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2263        }
2264        else if( chokeAll ) /* choke everyone if we're not uploading */
2265        {
2266            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2267        }
2268        else
2269        {
2270            struct ChokeData * n = &choke[size++];
2271            n->peer         = peer;
2272            n->isInterested = peer->peerIsInterested;
2273            n->isChoked     = peer->peerIsChoked;
2274            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2275        }
2276    }
2277
2278    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2279
2280    /**
2281     * Reciprocation and number of uploads capping is managed by unchoking
2282     * the N peers which have the best upload rate and are interested.
2283     * This maximizes the client's download rate. These N peers are
2284     * referred to as downloaders, because they are interested in downloading
2285     * from the client.
2286     *
2287     * Peers which have a better upload rate (as compared to the downloaders)
2288     * but aren't interested get unchoked. If they become interested, the
2289     * downloader with the worst upload rate gets choked. If a client has
2290     * a complete file, it uses its upload rate rather than its download
2291     * rate to decide which peers to unchoke.
2292     */
2293    unchokedInterested = 0;
2294    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2295        choke[i].doUnchoke = 1;
2296        if( choke[i].isInterested )
2297            ++unchokedInterested;
2298    }
2299
2300    /* optimistic unchoke */
2301    if( i < size )
2302    {
2303        int n;
2304        struct ChokeData * c;
2305        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2306
2307        for( ; i<size; ++i )
2308        {
2309            if( choke[i].isInterested )
2310            {
2311                const tr_peer * peer = choke[i].peer;
2312                int x = 1, y;
2313                if( isNew( peer ) ) x *= 3;
2314                if( isSame( peer ) ) x *= 3;
2315                for( y=0; y<x; ++y )
2316                    tr_ptrArrayAppend( &randPool, &choke[i] );
2317            }
2318        }
2319
2320        if(( n = tr_ptrArraySize( &randPool )))
2321        {
2322            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2323            c->doUnchoke = 1;
2324            t->optimistic = c->peer;
2325        }
2326
2327        tr_ptrArrayDestruct( &randPool, NULL );
2328    }
2329
2330    for( i=0; i<size; ++i )
2331        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2332
2333    /* cleanup */
2334    tr_free( choke );
2335}
2336
2337static int
2338rechokePulse( void * vmgr )
2339{
2340    uint64_t now;
2341    tr_torrent * tor = NULL;
2342    tr_peerMgr * mgr = vmgr;
2343    managerLock( mgr );
2344
2345    now = tr_date( );
2346    while(( tor = tr_torrentNext( mgr->session, tor )))
2347        if( tor->isRunning )
2348            rechokeTorrent( tor->torrentPeers, now );
2349
2350    managerUnlock( mgr );
2351    return TRUE;
2352}
2353
2354/***
2355****
2356****  Life and Death
2357****
2358***/
2359
2360typedef enum
2361{
2362    TR_CAN_KEEP,
2363    TR_CAN_CLOSE,
2364    TR_MUST_CLOSE,
2365}
2366tr_close_type_t;
2367
2368static tr_close_type_t
2369shouldPeerBeClosed( const Torrent    * t,
2370                    const tr_peer    * peer,
2371                    int                peerCount,
2372                    const time_t       now )
2373{
2374    const tr_torrent *       tor = t->tor;
2375    const struct peer_atom * atom = peer->atom;
2376
2377    /* if it's marked for purging, close it */
2378    if( peer->doPurge )
2379    {
2380        tordbg( t, "purging peer %s because its doPurge flag is set",
2381                tr_atomAddrStr( atom ) );
2382        return TR_MUST_CLOSE;
2383    }
2384
2385    /* if we're seeding and the peer has everything we have,
2386     * and enough time has passed for a pex exchange, then disconnect */
2387    if( tr_torrentIsSeed( tor ) )
2388    {
2389        int peerHasEverything;
2390        if( atom->flags & ADDED_F_SEED_FLAG )
2391            peerHasEverything = TRUE;
2392        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2393            peerHasEverything = FALSE;
2394        else {
2395            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2396            tr_bitsetDifference( tmp, &peer->have );
2397            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2398            tr_bitfieldFree( tmp );
2399        }
2400
2401        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2402        {
2403            tordbg( t, "purging peer %s because we're both seeds",
2404                    tr_atomAddrStr( atom ) );
2405            return TR_MUST_CLOSE;
2406        }
2407    }
2408
2409    /* disconnect if it's been too long since piece data has been transferred.
2410     * this is on a sliding scale based on number of available peers... */
2411    {
2412        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2413        /* if we have >= relaxIfFewerThan, strictness is 100%.
2414         * if we have zero connections, strictness is 0% */
2415        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2416                               ? 1.0
2417                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2418        const int lo = MIN_UPLOAD_IDLE_SECS;
2419        const int hi = MAX_UPLOAD_IDLE_SECS;
2420        const int limit = hi - ( ( hi - lo ) * strictness );
2421        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2422/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2423        if( idleTime > limit ) {
2424            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2425                       tr_atomAddrStr( atom ), idleTime );
2426            return TR_CAN_CLOSE;
2427        }
2428    }
2429
2430    return TR_CAN_KEEP;
2431}
2432
2433static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2434
2435static tr_peer **
2436getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2437{
2438    int i, peerCount, outsize;
2439    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2440    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2441
2442    assert( torrentIsLocked( t ) );
2443
2444    for( i = outsize = 0; i < peerCount; ++i )
2445        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2446            ret[outsize++] = peers[i];
2447
2448    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2449
2450    *setmeSize = outsize;
2451    return ret;
2452}
2453
2454static int
2455compareCandidates( const void * va, const void * vb )
2456{
2457    const struct peer_atom * a = *(const struct peer_atom**) va;
2458    const struct peer_atom * b = *(const struct peer_atom**) vb;
2459
2460    /* <Charles> Here we would probably want to try reconnecting to
2461     * peers that had most recently given us data. Lots of users have
2462     * trouble with resets due to their routers and/or ISPs. This way we
2463     * can quickly recover from an unwanted reset. So we sort
2464     * piece_data_time in descending order.
2465     */
2466
2467    if( a->piece_data_time != b->piece_data_time )
2468        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2469
2470    if( a->numFails != b->numFails )
2471        return a->numFails < b->numFails ? -1 : 1;
2472
2473    if( a->time != b->time )
2474        return a->time < b->time ? -1 : 1;
2475
2476    /* In order to avoid fragmenting the swarm, peers from trackers and
2477     * from the DHT should be preferred to peers from PEX. */
2478    if( a->from != b->from )
2479        return a->from < b->from ? -1 : 1;
2480
2481    return 0;
2482}
2483
2484static int
2485getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2486{
2487    int sec;
2488
2489    /* if we were recently connected to this peer and transferring piece
2490     * data, try to reconnect to them sooner rather that later -- we don't
2491     * want network troubles to get in the way of a good peer. */
2492    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2493        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2494
2495    /* don't allow reconnects more often than our minimum */
2496    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2497        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2498
2499    /* otherwise, the interval depends on how many times we've tried
2500     * and failed to connect to the peer */
2501    else switch( atom->numFails ) {
2502        case 0: sec = 0; break;
2503        case 1: sec = 5; break;
2504        case 2: sec = 2 * 60; break;
2505        case 3: sec = 15 * 60; break;
2506        case 4: sec = 30 * 60; break;
2507        case 5: sec = 60 * 60; break;
2508        default: sec = 120 * 60; break;
2509    }
2510
2511    return sec;
2512}
2513
2514static struct peer_atom **
2515getPeerCandidates( Torrent * t, const time_t now, int * setmeSize )
2516{
2517    int                 i, atomCount, retCount;
2518    struct peer_atom ** atoms;
2519    struct peer_atom ** ret;
2520    const int           seed = tr_torrentIsSeed( t->tor );
2521
2522    assert( torrentIsLocked( t ) );
2523
2524    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2525    ret = tr_new( struct peer_atom*, atomCount );
2526    for( i = retCount = 0; i < atomCount; ++i )
2527    {
2528        int                interval;
2529        struct peer_atom * atom = atoms[i];
2530
2531        /* peer fed us too much bad data ... we only keep it around
2532         * now to weed it out in case someone sends it to us via pex */
2533        if( atom->myflags & MYFLAG_BANNED )
2534            continue;
2535
2536        /* peer was unconnectable before, so we're not going to keep trying.
2537         * this is needs a separate flag from `banned', since if they try
2538         * to connect to us later, we'll let them in */
2539        if( atom->myflags & MYFLAG_UNREACHABLE )
2540            continue;
2541
2542        /* we don't need two connections to the same peer... */
2543        if( peerIsInUse( t, &atom->addr ) )
2544            continue;
2545
2546        /* no need to connect if we're both seeds... */
2547        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2548                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2549            continue;
2550
2551        /* don't reconnect too often */
2552        interval = getReconnectIntervalSecs( atom, now );
2553        if( ( now - atom->time ) < interval )
2554        {
2555            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2556                    i, tr_atomAddrStr( atom ), interval );
2557            continue;
2558        }
2559
2560        /* Don't connect to peers in our blocklist */
2561        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2562            continue;
2563
2564        ret[retCount++] = atom;
2565    }
2566
2567    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2568    *setmeSize = retCount;
2569    return ret;
2570}
2571
2572static void
2573closePeer( Torrent * t, tr_peer * peer )
2574{
2575    struct peer_atom * atom;
2576
2577    assert( t != NULL );
2578    assert( peer != NULL );
2579
2580    atom = peer->atom;
2581
2582    /* if we transferred piece data, then they might be good peers,
2583       so reset their `numFails' weight to zero.  otherwise we connected
2584       to them fruitlessly, so mark it as another fail */
2585    if( atom->piece_data_time )
2586        atom->numFails = 0;
2587    else
2588        ++atom->numFails;
2589
2590    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2591    removePeer( t, peer );
2592}
2593
2594static void
2595reconnectTorrent( Torrent * t )
2596{
2597    static time_t prevTime = 0;
2598    static int    newConnectionsThisSecond = 0;
2599    const time_t  now = time( NULL );
2600
2601    if( prevTime != now )
2602    {
2603        prevTime = now;
2604        newConnectionsThisSecond = 0;
2605    }
2606
2607    if( !t->isRunning )
2608    {
2609        removeAllPeers( t );
2610    }
2611    else
2612    {
2613        int i;
2614        int canCloseCount;
2615        int mustCloseCount;
2616        int candidateCount;
2617        int maxCandidates;
2618        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, now, &canCloseCount );
2619        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2620        struct peer_atom ** candidates = getPeerCandidates( t, now, &candidateCount );
2621
2622        tordbg( t, "reconnect pulse for [%s]: "
2623                   "%d must-close connections, "
2624                   "%d can-close connections, "
2625                   "%d connection candidates, "
2626                   "%d atoms, "
2627                   "max per pulse is %d",
2628                   tr_torrentName( t->tor ),
2629                   mustCloseCount,
2630                   canCloseCount,
2631                   candidateCount,
2632                   tr_ptrArraySize( &t->pool ),
2633                   MAX_RECONNECTIONS_PER_PULSE );
2634
2635        /* disconnect the really bad peers */
2636        for( i=0; i<mustCloseCount; ++i )
2637            closePeer( t, mustClose[i] );
2638
2639        /* decide how many peers can we try to add in this pass */
2640        maxCandidates = candidateCount;
2641        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2642        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2643        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2644
2645        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2646        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2647            closePeer( t, canClose[i] );
2648
2649        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2650                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2651                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2652                   candidateCount,
2653                   MAX_RECONNECTIONS_PER_PULSE,
2654                   getPeerCount( t ),
2655                   getMaxPeerCount( t->tor ),
2656                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2657
2658        /* add some new ones */
2659        for( i=0; i<maxCandidates; ++i )
2660        {
2661            tr_peerMgr        * mgr = t->manager;
2662            struct peer_atom  * atom = candidates[i];
2663            tr_peerIo         * io;
2664
2665            tordbg( t, "Starting an OUTGOING connection with %s",
2666                   tr_atomAddrStr( atom ) );
2667
2668            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->tor->info.hash );
2669
2670            if( io == NULL )
2671            {
2672                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2673                        tr_atomAddrStr( atom ) );
2674                atom->myflags |= MYFLAG_UNREACHABLE;
2675            }
2676            else
2677            {
2678                tr_handshake * handshake = tr_handshakeNew( io,
2679                                                            mgr->session->encryptionMode,
2680                                                            myHandshakeDoneCB,
2681                                                            mgr );
2682
2683                assert( tr_peerIoGetTorrentHash( io ) );
2684
2685                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2686
2687                ++newConnectionsThisSecond;
2688
2689                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2690                                         handshakeCompare );
2691            }
2692
2693            atom->time = now;
2694        }
2695
2696        /* cleanup */
2697        tr_free( candidates );
2698        tr_free( mustClose );
2699        tr_free( canClose );
2700    }
2701}
2702
2703struct peer_liveliness
2704{
2705    tr_peer * peer;
2706    void * clientData;
2707    time_t pieceDataTime;
2708    time_t time;
2709    int speed;
2710    tr_bool doPurge;
2711};
2712
2713static int
2714comparePeerLiveliness( const void * va, const void * vb )
2715{
2716    const struct peer_liveliness * a = va;
2717    const struct peer_liveliness * b = vb;
2718
2719    if( a->doPurge != b->doPurge )
2720        return a->doPurge ? 1 : -1;
2721
2722    if( a->speed != b->speed ) /* faster goes first */
2723        return a->speed > b->speed ? -1 : 1;
2724
2725    /* the one to give us data more recently goes first */
2726    if( a->pieceDataTime != b->pieceDataTime )
2727        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2728
2729    /* the one we connected to most recently goes first */
2730    if( a->time != b->time )
2731        return a->time > b->time ? -1 : 1;
2732
2733    return 0;
2734}
2735
2736static int
2737comparePeerLivelinessReverse( const void * va, const void * vb )
2738{
2739    return -comparePeerLiveliness (va, vb);
2740}
2741
2742static void
2743sortPeersByLivelinessImpl( tr_peer  ** peers,
2744                           void     ** clientData,
2745                           int         n,
2746                           uint64_t    now,
2747                           int (*compare) ( const void *va, const void *vb ) )
2748{
2749    int i;
2750    struct peer_liveliness *lives, *l;
2751
2752    /* build a sortable array of peer + extra info */
2753    lives = l = tr_new0( struct peer_liveliness, n );
2754    for( i=0; i<n; ++i, ++l )
2755    {
2756        tr_peer * p = peers[i];
2757        l->peer = p;
2758        l->doPurge = p->doPurge;
2759        l->pieceDataTime = p->atom->piece_data_time;
2760        l->time = p->atom->time;
2761        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2762                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2763        if( clientData )
2764            l->clientData = clientData[i];
2765    }
2766
2767    /* sort 'em */
2768    assert( n == ( l - lives ) );
2769    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2770
2771    /* build the peer array */
2772    for( i=0, l=lives; i<n; ++i, ++l ) {
2773        peers[i] = l->peer;
2774        if( clientData )
2775            clientData[i] = l->clientData;
2776    }
2777    assert( n == ( l - lives ) );
2778
2779    /* cleanup */
2780    tr_free( lives );
2781}
2782
2783static void
2784sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2785{
2786    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2787}
2788
2789static void
2790sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2791{
2792    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2793}
2794
2795
2796static void
2797enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2798{
2799    int n = tr_ptrArraySize( &t->peers );
2800    const int max = tr_torrentGetPeerLimit( t->tor );
2801    if( n > max )
2802    {
2803        void * base = tr_ptrArrayBase( &t->peers );
2804        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2805        sortPeersByLiveliness( peers, NULL, n, now );
2806        while( n > max )
2807            closePeer( t, peers[--n] );
2808        tr_free( peers );
2809    }
2810}
2811
2812static void
2813enforceSessionPeerLimit( tr_session * session, uint64_t now )
2814{
2815    int n = 0;
2816    tr_torrent * tor = NULL;
2817    const int max = tr_sessionGetPeerLimit( session );
2818
2819    /* count the total number of peers */
2820    while(( tor = tr_torrentNext( session, tor )))
2821        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2822
2823    /* if there are too many, prune out the worst */
2824    if( n > max )
2825    {
2826        tr_peer ** peers = tr_new( tr_peer*, n );
2827        Torrent ** torrents = tr_new( Torrent*, n );
2828
2829        /* populate the peer array */
2830        n = 0;
2831        tor = NULL;
2832        while(( tor = tr_torrentNext( session, tor ))) {
2833            int i;
2834            Torrent * t = tor->torrentPeers;
2835            const int tn = tr_ptrArraySize( &t->peers );
2836            for( i=0; i<tn; ++i, ++n ) {
2837                peers[n] = tr_ptrArrayNth( &t->peers, i );
2838                torrents[n] = t;
2839            }
2840        }
2841
2842        /* sort 'em */
2843        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2844
2845        /* cull out the crappiest */
2846        while( n-- > max )
2847            closePeer( torrents[n], peers[n] );
2848
2849        /* cleanup */
2850        tr_free( torrents );
2851        tr_free( peers );
2852    }
2853}
2854
2855
2856static int
2857reconnectPulse( void * vmgr )
2858{
2859    tr_torrent * tor;
2860    tr_peerMgr * mgr = vmgr;
2861    uint64_t now;
2862    managerLock( mgr );
2863
2864    now = tr_date( );
2865
2866    /* if we're over the per-torrent peer limits, cull some peers */
2867    tor = NULL;
2868    while(( tor = tr_torrentNext( mgr->session, tor )))
2869        if( tor->isRunning )
2870            enforceTorrentPeerLimit( tor->torrentPeers, now );
2871
2872    /* if we're over the per-session peer limits, cull some peers */
2873    enforceSessionPeerLimit( mgr->session, now );
2874
2875    tor = NULL;
2876    while(( tor = tr_torrentNext( mgr->session, tor )))
2877        if( tor->isRunning )
2878            reconnectTorrent( tor->torrentPeers );
2879
2880    managerUnlock( mgr );
2881    return TRUE;
2882}
2883
2884/****
2885*****
2886*****  BANDWIDTH ALLOCATION
2887*****
2888****/
2889
2890static void
2891pumpAllPeers( tr_peerMgr * mgr )
2892{
2893    tr_torrent * tor = NULL;
2894
2895    while(( tor = tr_torrentNext( mgr->session, tor )))
2896    {
2897        int j;
2898        Torrent * t = tor->torrentPeers;
2899
2900        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2901        {
2902            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2903            tr_peerMsgsPulse( peer->msgs );
2904        }
2905    }
2906}
2907
2908static int
2909bandwidthPulse( void * vmgr )
2910{
2911    tr_torrent * tor = NULL;
2912    tr_peerMgr * mgr = vmgr;
2913    managerLock( mgr );
2914
2915    /* FIXME: this next line probably isn't necessary... */
2916    pumpAllPeers( mgr );
2917
2918    /* allocate bandwidth to the peers */
2919    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2920    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2921
2922    /* possibly stop torrents that have seeded enough */
2923    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2924        if( tor->needsSeedRatioCheck ) {
2925            tor->needsSeedRatioCheck = FALSE;
2926            tr_torrentCheckSeedRatio( tor );
2927        }
2928    }
2929
2930    /* run the completeness check for any torrents that need it */
2931    tor = NULL;
2932    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2933        if( tor->torrentPeers->needsCompletenessCheck ) {
2934            tor->torrentPeers->needsCompletenessCheck  = FALSE;
2935            tr_torrentRecheckCompleteness( tor );
2936        }
2937    }
2938
2939    /* possibly stop torrents that have an error */
2940    tor = NULL;
2941    while(( tor = tr_torrentNext( mgr->session, tor )))
2942        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
2943            tr_torrentStop( tor );
2944
2945    managerUnlock( mgr );
2946    return TRUE;
2947}
2948
2949/***
2950****
2951***/
2952
2953static int
2954compareAtomPtrsByAddress( const void * va, const void *vb )
2955{
2956    const struct peer_atom * a = * (const struct peer_atom**) va;
2957    const struct peer_atom * b = * (const struct peer_atom**) vb;
2958
2959    assert( tr_isAtom( a ) );
2960    assert( tr_isAtom( b ) );
2961
2962    return tr_compareAddresses( &a->addr, &b->addr );
2963}
2964
2965static time_t tr_now = 0;
2966
2967/* best come first, worst go last */
2968static int
2969compareAtomPtrsByShelfDate( const void * va, const void *vb )
2970{
2971    time_t atime;
2972    time_t btime;
2973    const struct peer_atom * a = * (const struct peer_atom**) va;
2974    const struct peer_atom * b = * (const struct peer_atom**) vb;
2975    const int data_time_cutoff_secs = 60 * 60;
2976
2977    assert( tr_isAtom( a ) );
2978    assert( tr_isAtom( b ) );
2979
2980    /* primary key: the last piece data time *if* it was within the last hour */
2981    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
2982    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
2983    if( atime != btime )
2984        return atime > btime ? -1 : 1;
2985
2986    /* secondary key: shelf date. */
2987    if( a->shelf_date != b->shelf_date )
2988        return a->shelf_date > b->shelf_date ? -1 : 1;
2989
2990    return 0;
2991}
2992
2993static int
2994getMaxAtomCount( const tr_torrent * tor )
2995{
2996    /* FIXME: this curve should be smoother... */
2997    const int n = tor->maxConnectedPeers;
2998    if( n >= 200 ) return n * 1.5;
2999    if( n >= 100 ) return n * 2;
3000    if( n >=  50 ) return n * 3;
3001    if( n >=  20 ) return n * 5;
3002    return n * 10;
3003}
3004
3005static int
3006atomPulse( void * vmgr )
3007{
3008    tr_torrent * tor = NULL;
3009    tr_peerMgr * mgr = vmgr;
3010    managerLock( mgr );
3011
3012    while(( tor = tr_torrentNext( mgr->session, tor )))
3013    {
3014        int atomCount;
3015        Torrent * t = tor->torrentPeers;
3016        const int maxAtomCount = getMaxAtomCount( tor );
3017        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3018
3019        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3020        {
3021            int i;
3022            int keepCount = 0;
3023            int testCount = 0;
3024            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3025            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3026            tordbg( t, "max atom count is %d; have %d.. pruning\n", maxAtomCount, atomCount );
3027
3028            /* keep the ones that are in use */
3029            for( i=0; i<atomCount; ++i ) {
3030                struct peer_atom * atom = atoms[i];
3031                if( peerIsInUse( t, &atom->addr ) )
3032                    keep[keepCount++] = atom;
3033                else
3034                    test[testCount++] = atom;
3035            }
3036
3037            /* if there's room, keep the best of what's left */
3038            i = 0;
3039            if( keepCount < maxAtomCount ) {
3040                tr_now = time( NULL );
3041                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3042                while( i<testCount && keepCount<maxAtomCount )
3043                    keep[keepCount++] = test[i++];
3044            }
3045
3046            /* free the culled atoms */
3047            while( i<testCount )
3048                tr_free( test[i++] );
3049
3050            /* rebuild Torrent.pool with what's left */
3051            tr_ptrArrayDestruct( &t->pool, NULL );
3052            t->pool = TR_PTR_ARRAY_INIT;
3053            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3054            for( i=0; i<keepCount; ++i ) {
3055                if( i+1<keepCount )
3056                    assert( tr_compareAddresses( &keep[i]->addr, &keep[i+1]->addr ) < 0 );
3057                tr_ptrArrayAppend( &t->pool, keep[i] );
3058            }
3059
3060            /* cleanup */
3061            tr_free( test );
3062            tr_free( keep );
3063        }
3064    }
3065
3066    managerUnlock( mgr );
3067    return TRUE;
3068}
Note: See TracBrowser for help on using the repository browser.