source: trunk/libtransmission/peer-mgr.c @ 9633

Last change on this file since 9633 was 9633, checked in by charles, 13 years ago

(trunk libT) #2607 "avoid unnecessary calls to getPeerCandidates(), wasted cycles in peerIsInUse()"

  • Property svn:keywords set to Date Rev Author Id
File size: 87.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 9633 2009-11-29 18:35:02Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "ptrarray.h"
35#include "session.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to cull old atoms */
45    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
46
47    /* how frequently to change which peers are choked */
48    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
49
50    /* minimum interval for refilling peers' request lists */
51    REFILL_PERIOD_MSEC = 400,
52
53    /* how frequently to reallocate bandwidth */
54    BANDWIDTH_PERIOD_MSEC = 500,
55
56    /* how frequently to age out old piece request lists */
57    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
58
59    /* how frequently to decide which peers live and die */
60    RECONNECT_PERIOD_MSEC = 500,
61
62    /* when many peers are available, keep idle ones this long */
63    MIN_UPLOAD_IDLE_SECS = ( 60 ),
64
65    /* when few peers are available, keep idle ones this long */
66    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
67
68    /* max # of peers to ask fer per torrent per reconnect pulse */
69    MAX_RECONNECTIONS_PER_PULSE = 8,
70
71    /* max number of peers to ask for per second overall.
72    * this throttle is to avoid overloading the router */
73    MAX_CONNECTIONS_PER_SECOND = 16,
74
75    /* number of bad pieces a peer is allowed to send before we ban them */
76    MAX_BAD_PIECES_PER_PEER = 5,
77
78    /* amount of time to keep a list of request pieces lying around
79       before it's considered too old and needs to be rebuilt */
80    PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82    /* use for bitwise operations w/peer_atom.myflags */
83    MYFLAG_BANNED = 1,
84
85    /* use for bitwise operations w/peer_atom.myflags */
86    /* unreachable for now... but not banned.
87     * if they try to connect to us it's okay */
88    MYFLAG_UNREACHABLE = 2,
89
90    /* the minimum we'll wait before attempting to reconnect to a peer */
91    MINIMUM_RECONNECT_INTERVAL_SECS = 5
92};
93
94
95/**
96***
97**/
98
99enum
100{
101    UPLOAD_ONLY_UKNOWN,
102    UPLOAD_ONLY_YES,
103    UPLOAD_ONLY_NO
104};
105
106/**
107 * Peer information that should be kept even before we've connected and
108 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
109 * which ones would make good candidates for connecting to, and to watch out
110 * for banned peers.
111 *
112 * @see tr_peer
113 * @see tr_peermsgs
114 */
115struct peer_atom
116{
117    tr_peer   * peer;        /* will be NULL if not connected */
118    uint8_t     from;
119    uint8_t     flags;       /* these match the added_f flags */
120    uint8_t     myflags;     /* flags that aren't defined in added_f */
121    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
122    tr_port     port;
123    uint16_t    numFails;
124    tr_address  addr;
125    time_t      time;        /* when the peer's connection status last changed */
126    time_t      piece_data_time;
127
128    /* similar to a TTL field, but less rigid --
129     * if the swarm is small, the atom will be kept past this date. */
130    time_t      shelf_date;
131};
132
133static tr_bool
134tr_isAtom( const struct peer_atom * atom )
135{
136    return ( atom != NULL )
137        && ( atom->from < TR_PEER_FROM__MAX )
138        && ( tr_isAddress( &atom->addr ) );
139}
140
141static const char*
142tr_atomAddrStr( const struct peer_atom * atom )
143{
144    return tr_peerIoAddrStr( &atom->addr, atom->port );
145}
146
147struct block_request
148{
149    tr_block_index_t block;
150    tr_peer * peer;
151    time_t sentAt;
152};
153
154struct weighted_piece
155{
156    tr_piece_index_t index;
157    int16_t salt;
158    int16_t requestCount;
159};
160
161typedef struct tr_torrent_peers
162{
163    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
164    tr_ptrArray                pool; /* struct peer_atom */
165    tr_ptrArray                peers; /* tr_peer */
166    tr_ptrArray                webseeds; /* tr_webseed */
167
168    tr_torrent               * tor;
169    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
170    struct tr_peerMgr        * manager;
171
172    tr_bool                    isRunning;
173    tr_bool                    needsCompletenessCheck;
174
175    struct block_request     * requests;
176    int                        requestsSort;
177    int                        requestCount;
178    int                        requestAlloc;
179
180    struct weighted_piece    * pieces;
181    int                        piecesSort;
182    int                        pieceCount;
183
184    tr_bool                    isInEndgame;
185}
186Torrent;
187
188struct tr_peerMgr
189{
190    tr_session      * session;
191    tr_ptrArray       incomingHandshakes; /* tr_handshake */
192    tr_timer        * bandwidthTimer;
193    tr_timer        * rechokeTimer;
194    tr_timer        * reconnectTimer;
195    tr_timer        * refillUpkeepTimer;
196    tr_timer        * atomTimer;
197};
198
199#define tordbg( t, ... ) \
200    do { \
201        if( tr_deepLoggingIsActive( ) ) \
202            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
203    } while( 0 )
204
205#define dbgmsg( ... ) \
206    do { \
207        if( tr_deepLoggingIsActive( ) ) \
208            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
209    } while( 0 )
210
211/**
212***
213**/
214
215static TR_INLINE void
216managerLock( const struct tr_peerMgr * manager )
217{
218    tr_globalLock( manager->session );
219}
220
221static TR_INLINE void
222managerUnlock( const struct tr_peerMgr * manager )
223{
224    tr_globalUnlock( manager->session );
225}
226
227static TR_INLINE void
228torrentLock( Torrent * torrent )
229{
230    managerLock( torrent->manager );
231}
232
233static TR_INLINE void
234torrentUnlock( Torrent * torrent )
235{
236    managerUnlock( torrent->manager );
237}
238
239static TR_INLINE int
240torrentIsLocked( const Torrent * t )
241{
242    return tr_globalIsLocked( t->manager->session );
243}
244
245/**
246***
247**/
248
249static int
250handshakeCompareToAddr( const void * va, const void * vb )
251{
252    const tr_handshake * a = va;
253
254    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
255}
256
257static int
258handshakeCompare( const void * a, const void * b )
259{
260    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
261}
262
263static tr_handshake*
264getExistingHandshake( tr_ptrArray      * handshakes,
265                      const tr_address * addr )
266{
267    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
268}
269
270static int
271comparePeerAtomToAddress( const void * va, const void * vb )
272{
273    const struct peer_atom * a = va;
274
275    return tr_compareAddresses( &a->addr, vb );
276}
277
278static int
279compareAtomsByAddress( const void * va, const void * vb )
280{
281    const struct peer_atom * b = vb;
282
283    assert( tr_isAtom( b ) );
284
285    return comparePeerAtomToAddress( va, &b->addr );
286}
287
288/**
289***
290**/
291
292const tr_address *
293tr_peerAddress( const tr_peer * peer )
294{
295    return &peer->atom->addr;
296}
297
298static Torrent*
299getExistingTorrent( tr_peerMgr *    manager,
300                    const uint8_t * hash )
301{
302    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
303
304    return tor == NULL ? NULL : tor->torrentPeers;
305}
306
307static int
308peerCompare( const void * a, const void * b )
309{
310    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
311}
312
313static struct peer_atom*
314getExistingAtom( const Torrent    * t,
315                 const tr_address * addr )
316{
317    Torrent * tt = (Torrent*)t;
318    assert( torrentIsLocked( t ) );
319    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
320}
321
322static tr_bool
323peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
324{
325    Torrent * t = (Torrent*) ct;
326
327    assert( torrentIsLocked ( t ) );
328
329    return ( atom->peer != NULL )
330        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
331        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
332}
333
334static tr_peer*
335peerConstructor( struct peer_atom * atom )
336{
337    tr_peer * peer = tr_new0( tr_peer, 1 );
338    tr_bitsetConstructor( &peer->have, 0 );
339    peer->atom = atom;
340    atom->peer = peer;
341    return peer;
342}
343
344static tr_peer*
345getPeer( Torrent * torrent, struct peer_atom * atom )
346{
347    tr_peer * peer;
348
349    assert( torrentIsLocked( torrent ) );
350
351    peer = atom->peer;
352
353    if( peer == NULL )
354    {
355        peer = peerConstructor( atom );
356        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
357    }
358
359    return peer;
360}
361
362static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
363
364static void
365peerDestructor( Torrent * t, tr_peer * peer )
366{
367    assert( peer != NULL );
368
369    peerDeclinedAllRequests( t, peer );
370
371    if( peer->msgs != NULL )
372    {
373        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
374        tr_peerMsgsFree( peer->msgs );
375    }
376
377    tr_peerIoClear( peer->io );
378    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
379
380    tr_bitsetDestructor( &peer->have );
381    tr_bitfieldFree( peer->blame );
382    tr_free( peer->client );
383    peer->atom->peer = NULL;
384
385    tr_free( peer );
386}
387
388static void
389removePeer( Torrent * t, tr_peer * peer )
390{
391    tr_peer * removed;
392    struct peer_atom * atom = peer->atom;
393
394    assert( torrentIsLocked( t ) );
395    assert( atom );
396
397    atom->time = tr_time( );
398
399    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
400    assert( removed == peer );
401    peerDestructor( t, removed );
402}
403
404static void
405removeAllPeers( Torrent * t )
406{
407    while( !tr_ptrArrayEmpty( &t->peers ) )
408        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
409}
410
411static void
412torrentDestructor( void * vt )
413{
414    Torrent * t = vt;
415
416    assert( t );
417    assert( !t->isRunning );
418    assert( torrentIsLocked( t ) );
419    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
420    assert( tr_ptrArrayEmpty( &t->peers ) );
421
422    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
423    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
424    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
425    tr_ptrArrayDestruct( &t->peers, NULL );
426
427    tr_free( t->requests );
428    tr_free( t->pieces );
429    tr_free( t );
430}
431
432static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
433
434static Torrent*
435torrentConstructor( tr_peerMgr * manager,
436                    tr_torrent * tor )
437{
438    int       i;
439    Torrent * t;
440
441    t = tr_new0( Torrent, 1 );
442    t->manager = manager;
443    t->tor = tor;
444    t->pool = TR_PTR_ARRAY_INIT;
445    t->peers = TR_PTR_ARRAY_INIT;
446    t->webseeds = TR_PTR_ARRAY_INIT;
447    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
448    t->requests = 0;
449
450    for( i = 0; i < tor->info.webseedCount; ++i )
451    {
452        tr_webseed * w =
453            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
454        tr_ptrArrayAppend( &t->webseeds, w );
455    }
456
457    return t;
458}
459
460tr_peerMgr*
461tr_peerMgrNew( tr_session * session )
462{
463    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
464    m->session = session;
465    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
466    return m;
467}
468
469static void
470deleteTimers( struct tr_peerMgr * m )
471{
472    if( m->atomTimer )
473        tr_timerFree( &m->atomTimer );
474
475    if( m->bandwidthTimer )
476        tr_timerFree( &m->bandwidthTimer );
477
478    if( m->rechokeTimer )
479        tr_timerFree( &m->rechokeTimer );
480
481    if( m->reconnectTimer )
482        tr_timerFree( &m->reconnectTimer );
483
484    if( m->refillUpkeepTimer )
485        tr_timerFree( &m->refillUpkeepTimer );
486}
487
488void
489tr_peerMgrFree( tr_peerMgr * manager )
490{
491    managerLock( manager );
492
493    deleteTimers( manager );
494
495    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
496     * the item from manager->handshakes, so this is a little roundabout... */
497    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
498        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
499
500    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
501
502    managerUnlock( manager );
503    tr_free( manager );
504}
505
506static int
507clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
508{
509    if( !tr_torrentHasMetadata( tor ) )
510        return TRUE;
511
512    return peer->clientIsInterested && !peer->clientIsChoked;
513}
514
515static int
516clientIsUploadingTo( const tr_peer * peer )
517{
518    return peer->peerIsInterested && !peer->peerIsChoked;
519}
520
521/***
522****
523***/
524
525tr_bool
526tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
527                      const tr_address  * addr )
528{
529    tr_bool isSeed = FALSE;
530    const Torrent * t = tor->torrentPeers;
531    const struct peer_atom * atom = getExistingAtom( t, addr );
532
533    if( atom )
534        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
535
536    return isSeed;
537}
538
539/**
540***  REQUESTS
541***
542*** There are two data structures associated with managing block requests:
543***
544*** 1. Torrent::requests, an array of "struct block_request" which keeps
545***    track of which blocks have been requested, and when, and by which peers.
546***    This is list is used for (a) cancelling requests that have been pending
547***    for too long and (b) avoiding duplicate requests before endgame.
548***
549*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
550***    pieces that we want to request.  It's used to decide which pieces to
551***    return next when tr_peerMgrGetBlockRequests() is called.
552**/
553
554/**
555*** struct block_request
556**/
557
558enum
559{
560    REQ_UNSORTED,
561    REQ_SORTED_BY_BLOCK,
562    REQ_SORTED_BY_TIME
563};
564
565static int
566compareReqByBlock( const void * va, const void * vb )
567{
568    const struct block_request * a = va;
569    const struct block_request * b = vb;
570    if( a->block < b->block ) return -1;
571    if( a->block > b->block ) return 1;
572    return 0;
573}
574
575static int
576compareReqByTime( const void * va, const void * vb )
577{
578    const struct block_request * a = va;
579    const struct block_request * b = vb;
580    if( a->sentAt < b->sentAt ) return -1;
581    if( a->sentAt > b->sentAt ) return 1;
582    return 0;
583}
584
585static void
586requestListSort( Torrent * t, int mode )
587{
588    assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME );
589
590    if( t->requestsSort != mode )
591    {
592        int(*compar)(const void *, const void *);
593
594        t->requestsSort = mode;
595
596        switch( mode ) {
597            case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break;
598            case REQ_SORTED_BY_TIME: compar = compareReqByTime; break;
599            default: assert( 0 && "unhandled" );
600        }
601
602        qsort( t->requests, t->requestCount,
603               sizeof( struct block_request ), compar );
604    }
605}
606
607static void
608requestListAdd( Torrent * t, const time_t now, tr_block_index_t block, tr_peer * peer )
609{
610    struct block_request key;
611
612    /* ensure enough room is available... */
613    if( t->requestCount + 1 >= t->requestAlloc )
614    {
615        const int CHUNK_SIZE = 128;
616        t->requestAlloc += CHUNK_SIZE;
617        t->requests = tr_renew( struct block_request,
618                                t->requests, t->requestAlloc );
619    }
620
621    /* populate the record we're inserting */
622    key.block = block;
623    key.peer = peer;
624    key.sentAt = now;
625
626    /* insert the request to our array... */
627    switch( t->requestsSort )
628    {
629        case REQ_UNSORTED:
630        case REQ_SORTED_BY_TIME:
631            t->requests[t->requestCount++] = key;
632            break;
633
634        case REQ_SORTED_BY_BLOCK: {
635            tr_bool exact;
636            const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
637                                           sizeof( struct block_request ),
638                                           compareReqByBlock, &exact );
639            assert( !exact );
640            memmove( t->requests + pos + 1,
641                     t->requests + pos,
642                     sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
643            t->requests[pos] = key;
644            break;
645        }
646    }
647}
648
649static struct block_request *
650requestListLookup( Torrent * t, tr_block_index_t block )
651{
652    struct block_request key;
653    key.block = block;
654
655    requestListSort( t, REQ_SORTED_BY_BLOCK );
656
657    return bsearch( &key, t->requests, t->requestCount,
658                    sizeof( struct block_request ),
659                    compareReqByBlock );
660}
661
662static void
663requestListRemove( Torrent * t, tr_block_index_t block )
664{
665    const struct block_request * b = requestListLookup( t, block );
666    if( b != NULL )
667    {
668        const int pos = b - t->requests;
669        assert( pos < t->requestCount );
670        memmove( t->requests + pos,
671                 t->requests + pos + 1,
672                 sizeof( struct block_request ) * ( --t->requestCount - pos ) );
673    }
674}
675
676/**
677*** struct weighted_piece
678**/
679
680enum
681{
682    PIECES_UNSORTED,
683    PIECES_SORTED_BY_INDEX,
684    PIECES_SORTED_BY_WEIGHT
685};
686
687const tr_torrent * weightTorrent;
688
689/* we try to create a "weight" s.t. high-priority pieces come before others,
690 * and that partially-complete pieces come before empty ones. */
691static int
692comparePieceByWeight( const void * va, const void * vb )
693{
694    const struct weighted_piece * a = va;
695    const struct weighted_piece * b = vb;
696    int ia, ib, missing, pending;
697    const tr_torrent * tor = weightTorrent;
698
699    /* primary key: weight */
700    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
701    pending = a->requestCount;
702    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
703    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
704    pending = b->requestCount;
705    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
706    if( ia < ib ) return -1;
707    if( ia > ib ) return 1;
708
709    /* secondary key: higher priorities go first */
710    ia = tor->info.pieces[a->index].priority;
711    ib = tor->info.pieces[b->index].priority;
712    if( ia > ib ) return -1;
713    if( ia < ib ) return 1;
714
715    /* tertiary key: random */
716    return a->salt - b->salt;
717}
718
719static int
720comparePieceByIndex( const void * va, const void * vb )
721{
722    const struct weighted_piece * a = va;
723    const struct weighted_piece * b = vb;
724    if( a->index < b->index ) return -1;
725    if( a->index > b->index ) return 1;
726    return 0;
727}
728
729static void
730pieceListSort( Torrent * t, int mode )
731{
732    int(*compar)(const void *, const void *);
733
734    assert( mode==PIECES_SORTED_BY_INDEX
735         || mode==PIECES_SORTED_BY_WEIGHT );
736
737    if( t->piecesSort != mode )
738    {
739        t->piecesSort = mode;
740
741        switch( mode ) {
742            case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
743            case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
744            default: assert( 0 && "unhandled" );  break;
745        }
746
747        weightTorrent = t->tor;
748        qsort( t->pieces, t->pieceCount,
749               sizeof( struct weighted_piece ), compar );
750    }
751
752    /* Also, as long as we've got the pieces sorted by weight,
753     * let's also update t.isInEndgame */
754    if( t->piecesSort == PIECES_SORTED_BY_WEIGHT )
755    {
756        tr_bool endgame = TRUE;
757
758        if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
759        {
760            const tr_completion * cp = &t->tor->completion;
761            const struct weighted_piece * p = t->pieces;
762            const int pending = p->requestCount;
763            const int missing = tr_cpMissingBlocksInPiece( cp, p->index );
764            endgame = pending >= missing;
765        }
766
767        t->isInEndgame = endgame;
768    }
769}
770
771static struct weighted_piece *
772pieceListLookup( Torrent * t, tr_piece_index_t index )
773{
774    struct weighted_piece key;
775    key.index = index;
776
777    pieceListSort( t, PIECES_SORTED_BY_INDEX );
778
779    return bsearch( &key, t->pieces, t->pieceCount,
780                    sizeof( struct weighted_piece ),
781                    comparePieceByIndex );
782}
783
784static void
785pieceListRebuild( Torrent * t )
786{
787    if( !tr_torrentIsSeed( t->tor ) )
788    {
789        tr_piece_index_t i;
790        tr_piece_index_t * pool;
791        tr_piece_index_t poolCount = 0;
792        const tr_torrent * tor = t->tor;
793        const tr_info * inf = tr_torrentInfo( tor );
794        struct weighted_piece * pieces;
795        int pieceCount;
796
797        /* build the new list */
798        pool = tr_new( tr_piece_index_t, inf->pieceCount );
799        for( i=0; i<inf->pieceCount; ++i )
800            if( !inf->pieces[i].dnd )
801                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
802                    pool[poolCount++] = i;
803        pieceCount = poolCount;
804        pieces = tr_new0( struct weighted_piece, pieceCount );
805        for( i=0; i<poolCount; ++i ) {
806            struct weighted_piece * piece = pieces + i;
807            piece->index = pool[i];
808            piece->requestCount = 0;
809            piece->salt = tr_cryptoWeakRandInt( 255 );
810        }
811
812        /* if we already had a list of pieces, merge it into
813         * the new list so we don't lose its requestCounts */
814        if( t->pieces != NULL )
815        {
816            struct weighted_piece * o = t->pieces;
817            struct weighted_piece * oend = o + t->pieceCount;
818            struct weighted_piece * n = pieces;
819            struct weighted_piece * nend = n + pieceCount;
820
821            pieceListSort( t, PIECES_SORTED_BY_INDEX );
822
823            while( o!=oend && n!=nend ) {
824                if( o->index < n->index )
825                    ++o;
826                else if( o->index > n->index )
827                    ++n;
828                else
829                    *n++ = *o++;
830            }
831
832            tr_free( t->pieces );
833        }
834
835        t->pieces = pieces;
836        t->pieceCount = pieceCount;
837        t->piecesSort = PIECES_SORTED_BY_INDEX;
838
839        /* cleanup */
840        tr_free( pool );
841    }
842}
843
844static void
845pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
846{
847    struct weighted_piece * p = pieceListLookup( t, piece );
848
849    if( p != NULL )
850    {
851        const int pos = p - t->pieces;
852
853        memmove( t->pieces + pos,
854                 t->pieces + pos + 1,
855                 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
856
857        if( t->pieceCount == 0 )
858        {
859            tr_free( t->pieces );
860            t->pieces = NULL;
861        }
862    }
863}
864
865static void
866pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
867{
868    struct weighted_piece * p;
869    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
870
871    if(( p = pieceListLookup( t, index )))
872        if( p->requestCount > 0 )
873            --p->requestCount;
874
875    /* note: this invalidates the weighted.piece.weight field,
876     * but that's OK since the call to pieceListLookup ensured
877     * that we were sorted by index anyway.. next time we resort
878     * by weight, pieceListSort() will update the weights */
879}
880
881/**
882***
883**/
884
885void
886tr_peerMgrRebuildRequests( tr_torrent * tor )
887{
888    assert( tr_isTorrent( tor ) );
889
890    pieceListRebuild( tor->torrentPeers );
891}
892
893void
894tr_peerMgrGetNextRequests( tr_torrent           * tor,
895                           tr_peer              * peer,
896                           int                    numwant,
897                           tr_block_index_t     * setme,
898                           int                  * numgot )
899{
900    int i;
901    int got;
902    Torrent * t;
903    struct weighted_piece * pieces;
904    const tr_bitset * have = &peer->have;
905    const time_t now = tr_time( );
906
907    /* sanity clause */
908    assert( tr_isTorrent( tor ) );
909    assert( numwant > 0 );
910
911    /* walk through the pieces and find blocks that should be requested */
912    got = 0;
913    t = tor->torrentPeers;
914
915    /* prep the pieces list */
916    if( t->pieces == NULL )
917        pieceListRebuild( t );
918    pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
919
920    pieces = t->pieces;
921    for( i=0; i<t->pieceCount && got<numwant; ++i )
922    {
923        struct weighted_piece * p = pieces + i;
924
925        /* if the peer has this piece that we want... */
926        if( tr_bitsetHasFast( have, p->index ) )
927        {
928            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
929            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
930
931            for( ; b!=e && got<numwant; ++b )
932            {
933                struct block_request * breq;
934
935                /* don't request blocks we've already got */
936                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
937                    continue;
938
939                /* don't request blocks we've already requested (FIXME) */
940                breq = requestListLookup( t, b );
941                if( breq != NULL ) {
942                    assert( breq->peer != NULL );
943                    if( breq->peer == peer ) continue;
944                    if( !t->isInEndgame ) continue;
945                }
946
947                setme[got++] = b;
948
949                /* update our own tables */
950                if( breq == NULL )
951                    requestListAdd( t, now, b, peer );
952                ++p->requestCount;
953            }
954        }
955    }
956
957    /* We almost always change only a handful of pieces in the array.
958     * In these cases, it's cheaper to sort those changed pieces and merge,
959     * than qsort()ing the whole array again */
960    if( got > 0 )
961    {
962        struct weighted_piece * p;
963        struct weighted_piece * pieces;
964        struct weighted_piece * a = t->pieces;
965        struct weighted_piece * a_end = t->pieces + i;
966        struct weighted_piece * b = a_end;
967        struct weighted_piece * b_end = t->pieces + t->pieceCount;
968
969        /* rescore the pieces that we changed */
970        weightTorrent = t->tor;
971        qsort( a, a_end-a, sizeof( struct weighted_piece ), comparePieceByWeight );
972
973        /* allocate a new array */
974        p = pieces = tr_new( struct weighted_piece, t->pieceCount );
975
976        /* merge the two sorted arrays into this new array */
977        weightTorrent = t->tor;
978        while( a!=a_end && b!=b_end )
979            *p++ = comparePieceByWeight( a, b ) < 0 ? *a++ : *b++;
980        while( a!=a_end ) *p++ = *a++;
981        while( b!=b_end ) *p++ = *b++;
982
983#if 0
984        /* make sure we did it right */
985        assert( p - pieces == t->pieceCount );
986        for( it=pieces; it+1<p; ++it )
987            assert( it->weight <= it[1].weight );
988#endif
989
990        /* update */
991        tr_free( t->pieces );
992        t->pieces = pieces;
993    }
994
995    *numgot = got;
996}
997
998tr_bool
999tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1000                          const tr_peer     * peer,
1001                          tr_block_index_t    block )
1002{
1003    const Torrent * t = tor->torrentPeers;
1004    const struct block_request * b = requestListLookup( (Torrent*)t, block );
1005    if( b == NULL ) return FALSE;
1006    if( b->peer == peer ) return TRUE;
1007    if( t->isInEndgame ) return TRUE;
1008    return FALSE;
1009}
1010
1011/* cancel requests that are too old */
1012static int
1013refillUpkeep( void * vmgr )
1014{
1015    time_t now;
1016    time_t too_old;
1017    tr_torrent * tor;
1018    tr_peerMgr * mgr = vmgr;
1019    managerLock( mgr );
1020
1021    now = tr_time( );
1022    too_old = now - REQUEST_TTL_SECS;
1023
1024    tor = NULL;
1025    while(( tor = tr_torrentNext( mgr->session, tor )))
1026    {
1027        Torrent * t = tor->torrentPeers;
1028        const int n = t->requestCount;
1029        if( n > 0 )
1030        {
1031            int keepCount = 0;
1032            int cancelCount = 0;
1033            struct block_request * keep = tr_new( struct block_request, n );
1034            struct block_request * cancel = tr_new( struct block_request, n );
1035            const struct block_request * it;
1036            const struct block_request * end;
1037
1038            for( it=t->requests, end=it+n; it!=end; ++it )
1039                if( it->sentAt <= too_old )
1040                    cancel[cancelCount++] = *it;
1041                else
1042                    keep[keepCount++] = *it;
1043
1044            /* prune out the ones we aren't keeping */
1045            tr_free( t->requests );
1046            t->requests = keep;
1047            t->requestCount = keepCount;
1048            t->requestAlloc = n;
1049
1050            /* send cancel messages for all the "cancel" ones */
1051            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1052                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) )
1053                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1054
1055            /* decrement the pending request counts for the timed-out blocks */
1056            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1057                pieceListRemoveRequest( t, it->block );
1058
1059            /* cleanup loop */
1060            tr_free( cancel );
1061        }
1062    }
1063
1064    managerUnlock( mgr );
1065    return TRUE;
1066}
1067
1068static void
1069addStrike( Torrent * t, tr_peer * peer )
1070{
1071    tordbg( t, "increasing peer %s strike count to %d",
1072            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1073
1074    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1075    {
1076        struct peer_atom * atom = peer->atom;
1077        atom->myflags |= MYFLAG_BANNED;
1078        peer->doPurge = 1;
1079        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1080    }
1081}
1082
1083static void
1084gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1085{
1086    tr_torrent *   tor = t->tor;
1087    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1088
1089    tor->corruptCur += byteCount;
1090    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1091}
1092
1093static void
1094peerSuggestedPiece( Torrent            * t UNUSED,
1095                    tr_peer            * peer UNUSED,
1096                    tr_piece_index_t     pieceIndex UNUSED,
1097                    int                  isFastAllowed UNUSED )
1098{
1099#if 0
1100    assert( t );
1101    assert( peer );
1102    assert( peer->msgs );
1103
1104    /* is this a valid piece? */
1105    if(  pieceIndex >= t->tor->info.pieceCount )
1106        return;
1107
1108    /* don't ask for it if we've already got it */
1109    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1110        return;
1111
1112    /* don't ask for it if they don't have it */
1113    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1114        return;
1115
1116    /* don't ask for it if we're choked and it's not fast */
1117    if( !isFastAllowed && peer->clientIsChoked )
1118        return;
1119
1120    /* request the blocks that we don't have in this piece */
1121    {
1122        tr_block_index_t block;
1123        const tr_torrent * tor = t->tor;
1124        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1125        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1126
1127        for( block=start; block<end; ++block )
1128        {
1129            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1130            {
1131                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1132                const uint32_t length = tr_torBlockCountBytes( tor, block );
1133                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1134                incrementPieceRequests( t, pieceIndex );
1135            }
1136        }
1137    }
1138#endif
1139}
1140
1141static void
1142decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount )
1143{
1144    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1145}
1146
1147static void
1148clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block )
1149{
1150    decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) );
1151}
1152
1153static void
1154removeRequestFromTables( Torrent * t, tr_block_index_t block )
1155{
1156    requestListRemove( t, block );
1157    pieceListRemoveRequest( t, block );
1158}
1159
1160/* peer choked us, or maybe it disconnected.
1161   either way we need to remove all its requests */
1162static void
1163peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1164{
1165    int i, n;
1166    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1167
1168    for( i=n=0; i<t->requestCount; ++i )
1169        if( peer == t->requests[i].peer )
1170            blocks[n++] = t->requests[i].block;
1171
1172    for( i=0; i<n; ++i )
1173        removeRequestFromTables( t, blocks[i] );
1174
1175    tr_free( blocks );
1176}
1177
1178static void
1179peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1180{
1181    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1182    Torrent * t = vt;
1183    const tr_peer_event * e = vevent;
1184
1185    torrentLock( t );
1186
1187    switch( e->eventType )
1188    {
1189        case TR_PEER_UPLOAD_ONLY:
1190            /* update our atom */
1191            if( peer ) {
1192                if( e->uploadOnly ) {
1193                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1194                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1195                } else {
1196                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1197                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1198                }
1199            }
1200            break;
1201
1202        case TR_PEER_PEER_GOT_DATA:
1203        {
1204            const time_t now = tr_time( );
1205            tr_torrent * tor = t->tor;
1206
1207            tr_torrentSetActivityDate( tor, now );
1208
1209            if( e->wasPieceData ) {
1210                tor->uploadedCur += e->length;
1211                tr_torrentSetDirty( tor );
1212            }
1213
1214            /* update the stats */
1215            if( e->wasPieceData )
1216                tr_statsAddUploaded( tor->session, e->length );
1217
1218            /* update our atom */
1219            if( peer && e->wasPieceData )
1220                peer->atom->piece_data_time = now;
1221
1222            tor->needsSeedRatioCheck = TRUE;
1223
1224            break;
1225        }
1226
1227        case TR_PEER_CLIENT_GOT_REJ:
1228            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ) );
1229            break;
1230
1231        case TR_PEER_CLIENT_GOT_CHOKE:
1232            peerDeclinedAllRequests( t, peer );
1233            break;
1234
1235        case TR_PEER_CLIENT_GOT_PORT:
1236            if( peer )
1237                peer->atom->port = e->port;
1238            break;
1239
1240        case TR_PEER_CLIENT_GOT_SUGGEST:
1241            if( peer )
1242                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1243            break;
1244
1245        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1246            if( peer )
1247                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1248            break;
1249
1250        case TR_PEER_CLIENT_GOT_DATA:
1251        {
1252            const time_t now = tr_time( );
1253            tr_torrent * tor = t->tor;
1254
1255            tr_torrentSetActivityDate( tor, now );
1256
1257            /* only add this to downloadedCur if we got it from a peer --
1258             * webseeds shouldn't count against our ratio.  As one tracker
1259             * admin put it, "Those pieces are downloaded directly from the
1260             * content distributor, not the peers, it is the tracker's job
1261             * to manage the swarms, not the web server and does not fit
1262             * into the jurisdiction of the tracker." */
1263            if( peer && e->wasPieceData ) {
1264                tor->downloadedCur += e->length;
1265                tr_torrentSetDirty( tor );
1266            }
1267
1268            /* update the stats */
1269            if( e->wasPieceData )
1270                tr_statsAddDownloaded( tor->session, e->length );
1271
1272            /* update our atom */
1273            if( peer && e->wasPieceData )
1274                peer->atom->piece_data_time = now;
1275
1276            break;
1277        }
1278
1279        case TR_PEER_PEER_PROGRESS:
1280        {
1281            if( peer )
1282            {
1283                struct peer_atom * atom = peer->atom;
1284                if( e->progress >= 1.0 ) {
1285                    tordbg( t, "marking peer %s as a seed",
1286                            tr_atomAddrStr( atom ) );
1287                    atom->flags |= ADDED_F_SEED_FLAG;
1288                }
1289            }
1290            break;
1291        }
1292
1293        case TR_PEER_CLIENT_GOT_BLOCK:
1294        {
1295            tr_torrent * tor = t->tor;
1296            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1297
1298            requestListRemove( t, block );
1299            pieceListRemoveRequest( t, block );
1300
1301            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1302            {
1303                tordbg( t, "we have this block already..." );
1304                clientGotUnwantedBlock( tor, block );
1305            }
1306            else
1307            {
1308                tr_cpBlockAdd( &tor->completion, block );
1309                tr_torrentSetDirty( tor );
1310
1311                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1312                {
1313                    const tr_piece_index_t p = e->pieceIndex;
1314                    const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1315
1316                    if( !ok )
1317                    {
1318                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1319                                   (unsigned long)p );
1320                    }
1321
1322                    tr_torrentSetHasPiece( tor, p, ok );
1323                    tr_torrentSetPieceChecked( tor, p, TRUE );
1324                    tr_peerMgrSetBlame( tor, p, ok );
1325
1326                    if( !ok )
1327                    {
1328                        gotBadPiece( t, p );
1329                    }
1330                    else
1331                    {
1332                        int i;
1333                        int peerCount;
1334                        tr_peer ** peers;
1335                        tr_file_index_t fileIndex;
1336
1337                        peerCount = tr_ptrArraySize( &t->peers );
1338                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1339                        for( i=0; i<peerCount; ++i )
1340                            tr_peerMsgsHave( peers[i]->msgs, p );
1341
1342                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1343                            const tr_file * file = &tor->info.files[fileIndex];
1344                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1345                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1346                                    tr_torrentFileCompleted( tor, fileIndex );
1347                        }
1348
1349                        pieceListRemovePiece( t, p );
1350                    }
1351                }
1352
1353                t->needsCompletenessCheck = TRUE;
1354            }
1355            break;
1356        }
1357
1358        case TR_PEER_ERROR:
1359            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1360            {
1361                /* some protocol error from the peer */
1362                peer->doPurge = 1;
1363                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1364                        tr_atomAddrStr( peer->atom ) );
1365            }
1366            else
1367            {
1368                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1369            }
1370            break;
1371
1372        default:
1373            assert( 0 );
1374    }
1375
1376    torrentUnlock( t );
1377}
1378
1379static int
1380getDefaultShelfLife( uint8_t from )
1381{
1382    /* in general, peers obtained from firsthand contact
1383     * are better than those from secondhand, etc etc */
1384    switch( from )
1385    {
1386        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1387        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1388        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1389        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1390        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1391        case TR_PEER_FROM_RESUME   : return 60 * 60;
1392        default                    : return 60 * 60;
1393    }
1394}
1395
1396
1397static void
1398ensureAtomExists( Torrent           * t,
1399                  const tr_address  * addr,
1400                  const tr_port       port,
1401                  const uint8_t       flags,
1402                  const uint8_t       from )
1403{
1404    assert( tr_isAddress( addr ) );
1405    assert( from < TR_PEER_FROM__MAX );
1406
1407    if( getExistingAtom( t, addr ) == NULL )
1408    {
1409        struct peer_atom * a;
1410        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1411
1412        a = tr_new0( struct peer_atom, 1 );
1413        a->addr = *addr;
1414        a->port = port;
1415        a->flags = flags;
1416        a->from = from;
1417        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1418        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1419
1420        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1421    }
1422}
1423
1424static int
1425getMaxPeerCount( const tr_torrent * tor )
1426{
1427    return tor->maxConnectedPeers;
1428}
1429
1430static int
1431getPeerCount( const Torrent * t )
1432{
1433    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1434}
1435
1436/* FIXME: this is kind of a mess. */
1437static tr_bool
1438myHandshakeDoneCB( tr_handshake  * handshake,
1439                   tr_peerIo     * io,
1440                   tr_bool         isConnected,
1441                   const uint8_t * peer_id,
1442                   void          * vmanager )
1443{
1444    tr_bool            ok = isConnected;
1445    tr_bool            success = FALSE;
1446    tr_port            port;
1447    const tr_address * addr;
1448    tr_peerMgr       * manager = vmanager;
1449    Torrent          * t;
1450    tr_handshake     * ours;
1451
1452    assert( io );
1453    assert( tr_isBool( ok ) );
1454
1455    t = tr_peerIoHasTorrentHash( io )
1456        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1457        : NULL;
1458
1459    if( tr_peerIoIsIncoming ( io ) )
1460        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1461                                        handshake, handshakeCompare );
1462    else if( t )
1463        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1464                                        handshake, handshakeCompare );
1465    else
1466        ours = handshake;
1467
1468    assert( ours );
1469    assert( ours == handshake );
1470
1471    if( t )
1472        torrentLock( t );
1473
1474    addr = tr_peerIoGetAddress( io, &port );
1475
1476    if( !ok || !t || !t->isRunning )
1477    {
1478        if( t )
1479        {
1480            struct peer_atom * atom = getExistingAtom( t, addr );
1481            if( atom )
1482                ++atom->numFails;
1483        }
1484    }
1485    else /* looking good */
1486    {
1487        struct peer_atom * atom;
1488
1489        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1490        atom = getExistingAtom( t, addr );
1491        atom->time = tr_time( );
1492        atom->piece_data_time = 0;
1493
1494        if( atom->myflags & MYFLAG_BANNED )
1495        {
1496            tordbg( t, "banned peer %s tried to reconnect",
1497                    tr_atomAddrStr( atom ) );
1498        }
1499        else if( tr_peerIoIsIncoming( io )
1500               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1501
1502        {
1503        }
1504        else
1505        {
1506            tr_peer * peer = atom->peer;
1507
1508            if( peer ) /* we already have this peer */
1509            {
1510            }
1511            else
1512            {
1513                peer = getPeer( t, atom );
1514                tr_free( peer->client );
1515
1516                if( !peer_id )
1517                    peer->client = NULL;
1518                else {
1519                    char client[128];
1520                    tr_clientForId( client, sizeof( client ), peer_id );
1521                    peer->client = tr_strdup( client );
1522                }
1523
1524                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1525                                                                balanced by our unref in peerDestructor()  */
1526                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1527                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1528
1529                success = TRUE;
1530            }
1531        }
1532    }
1533
1534    if( t )
1535        torrentUnlock( t );
1536
1537    return success;
1538}
1539
1540void
1541tr_peerMgrAddIncoming( tr_peerMgr * manager,
1542                       tr_address * addr,
1543                       tr_port      port,
1544                       int          socket )
1545{
1546    tr_session * session;
1547
1548    managerLock( manager );
1549
1550    assert( tr_isSession( manager->session ) );
1551    session = manager->session;
1552
1553    if( tr_sessionIsAddressBlocked( session, addr ) )
1554    {
1555        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1556        tr_netClose( session, socket );
1557    }
1558    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1559    {
1560        tr_netClose( session, socket );
1561    }
1562    else /* we don't have a connection to them yet... */
1563    {
1564        tr_peerIo *    io;
1565        tr_handshake * handshake;
1566
1567        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1568
1569        handshake = tr_handshakeNew( io,
1570                                     session->encryptionMode,
1571                                     myHandshakeDoneCB,
1572                                     manager );
1573
1574        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1575
1576        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1577                                 handshakeCompare );
1578    }
1579
1580    managerUnlock( manager );
1581}
1582
1583static tr_bool
1584tr_isPex( const tr_pex * pex )
1585{
1586    return pex && tr_isAddress( &pex->addr );
1587}
1588
1589void
1590tr_peerMgrAddPex( tr_torrent   *  tor,
1591                  uint8_t         from,
1592                  const tr_pex *  pex )
1593{
1594    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1595    {
1596        Torrent * t = tor->torrentPeers;
1597        managerLock( t->manager );
1598
1599        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1600            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1601                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1602
1603        managerUnlock( t->manager );
1604    }
1605}
1606
1607tr_pex *
1608tr_peerMgrCompactToPex( const void *    compact,
1609                        size_t          compactLen,
1610                        const uint8_t * added_f,
1611                        size_t          added_f_len,
1612                        size_t *        pexCount )
1613{
1614    size_t          i;
1615    size_t          n = compactLen / 6;
1616    const uint8_t * walk = compact;
1617    tr_pex *        pex = tr_new0( tr_pex, n );
1618
1619    for( i = 0; i < n; ++i )
1620    {
1621        pex[i].addr.type = TR_AF_INET;
1622        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1623        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1624        if( added_f && ( n == added_f_len ) )
1625            pex[i].flags = added_f[i];
1626    }
1627
1628    *pexCount = n;
1629    return pex;
1630}
1631
1632tr_pex *
1633tr_peerMgrCompact6ToPex( const void    * compact,
1634                         size_t          compactLen,
1635                         const uint8_t * added_f,
1636                         size_t          added_f_len,
1637                         size_t        * pexCount )
1638{
1639    size_t          i;
1640    size_t          n = compactLen / 18;
1641    const uint8_t * walk = compact;
1642    tr_pex *        pex = tr_new0( tr_pex, n );
1643
1644    for( i = 0; i < n; ++i )
1645    {
1646        pex[i].addr.type = TR_AF_INET6;
1647        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1648        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1649        if( added_f && ( n == added_f_len ) )
1650            pex[i].flags = added_f[i];
1651    }
1652
1653    *pexCount = n;
1654    return pex;
1655}
1656
1657tr_pex *
1658tr_peerMgrArrayToPex( const void * array,
1659                      size_t       arrayLen,
1660                      size_t      * pexCount )
1661{
1662    size_t          i;
1663    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1664    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1665    const uint8_t * walk = array;
1666    tr_pex        * pex = tr_new0( tr_pex, n );
1667
1668    for( i = 0 ; i < n ; i++ ) {
1669        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1670        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1671        pex[i].flags = 0x00;
1672        walk += sizeof( tr_address ) + 2;
1673    }
1674
1675    *pexCount = n;
1676    return pex;
1677}
1678
1679/**
1680***
1681**/
1682
1683void
1684tr_peerMgrSetBlame( tr_torrent     * tor,
1685                    tr_piece_index_t pieceIndex,
1686                    int              success )
1687{
1688    if( !success )
1689    {
1690        int        peerCount, i;
1691        Torrent *  t = tor->torrentPeers;
1692        tr_peer ** peers;
1693
1694        assert( torrentIsLocked( t ) );
1695
1696        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1697        for( i = 0; i < peerCount; ++i )
1698        {
1699            tr_peer * peer = peers[i];
1700            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1701            {
1702                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1703                        tr_atomAddrStr( peer->atom ),
1704                        pieceIndex, (int)peer->strikes + 1 );
1705                addStrike( t, peer );
1706            }
1707        }
1708    }
1709}
1710
1711int
1712tr_pexCompare( const void * va, const void * vb )
1713{
1714    const tr_pex * a = va;
1715    const tr_pex * b = vb;
1716    int i;
1717
1718    assert( tr_isPex( a ) );
1719    assert( tr_isPex( b ) );
1720
1721    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1722        return i;
1723
1724    if( a->port != b->port )
1725        return a->port < b->port ? -1 : 1;
1726
1727    return 0;
1728}
1729
1730#if 0
1731static int
1732peerPrefersCrypto( const tr_peer * peer )
1733{
1734    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1735        return TRUE;
1736
1737    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1738        return FALSE;
1739
1740    return tr_peerIoIsEncrypted( peer->io );
1741}
1742#endif
1743
1744/* better goes first */
1745static int
1746compareAtomsByUsefulness( const void * va, const void *vb )
1747{
1748    const struct peer_atom * a = * (const struct peer_atom**) va;
1749    const struct peer_atom * b = * (const struct peer_atom**) vb;
1750
1751    assert( tr_isAtom( a ) );
1752    assert( tr_isAtom( b ) );
1753
1754    if( a->piece_data_time != b->piece_data_time )
1755        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1756    if( a->from != b->from )
1757        return a->from < b->from ? -1 : 1;
1758    if( a->numFails != b->numFails )
1759        return a->numFails < b->numFails ? -1 : 1;
1760
1761    return 0;
1762}
1763
1764int
1765tr_peerMgrGetPeers( tr_torrent   * tor,
1766                    tr_pex      ** setme_pex,
1767                    uint8_t        af,
1768                    uint8_t        list_mode,
1769                    int            maxCount )
1770{
1771    int i;
1772    int n;
1773    int count = 0;
1774    int atomCount = 0;
1775    const Torrent * t = tor->torrentPeers;
1776    struct peer_atom ** atoms = NULL;
1777    tr_pex * pex;
1778    tr_pex * walk;
1779
1780    assert( tr_isTorrent( tor ) );
1781    assert( setme_pex != NULL );
1782    assert( af==TR_AF_INET || af==TR_AF_INET6 );
1783    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1784
1785    managerLock( t->manager );
1786
1787    /**
1788    ***  build a list of atoms
1789    **/
1790
1791    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1792    {
1793        int i;
1794        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1795        atomCount = tr_ptrArraySize( &t->peers );
1796        atoms = tr_new( struct peer_atom *, atomCount );
1797        for( i=0; i<atomCount; ++i )
1798            atoms[i] = peers[i]->atom;
1799    }
1800    else /* TR_PEERS_ALL */
1801    {
1802        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1803        atomCount = tr_ptrArraySize( &t->pool );
1804        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1805    }
1806
1807    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1808
1809    /**
1810    ***  add the first N of them into our return list
1811    **/
1812
1813    n = MIN( atomCount, maxCount );
1814    pex = walk = tr_new0( tr_pex, n );
1815
1816    for( i=0; i<atomCount && count<n; ++i )
1817    {
1818        const struct peer_atom * atom = atoms[i];
1819        if( atom->addr.type == af )
1820        {
1821            assert( tr_isAddress( &atom->addr ) );
1822            walk->addr = atom->addr;
1823            walk->port = atom->port;
1824            walk->flags = atom->flags;
1825            ++count;
1826            ++walk;
1827        }
1828    }
1829
1830    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1831
1832    assert( ( walk - pex ) == count );
1833    *setme_pex = pex;
1834
1835    /* cleanup */
1836    tr_free( atoms );
1837    managerUnlock( t->manager );
1838    return count;
1839}
1840
1841static int atomPulse      ( void * vmgr );
1842static int bandwidthPulse ( void * vmgr );
1843static int rechokePulse   ( void * vmgr );
1844static int reconnectPulse ( void * vmgr );
1845
1846static void
1847ensureMgrTimersExist( struct tr_peerMgr * m )
1848{
1849    tr_session * s = m->session;
1850
1851    if( m->atomTimer == NULL )
1852        m->atomTimer = tr_timerNew( s, atomPulse, m, ATOM_PERIOD_MSEC );
1853
1854    if( m->bandwidthTimer == NULL )
1855        m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
1856
1857    if( m->rechokeTimer == NULL )
1858        m->rechokeTimer = tr_timerNew( s, rechokePulse, m, RECHOKE_PERIOD_MSEC );
1859
1860    if( m->reconnectTimer == NULL )
1861        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
1862
1863    if( m->refillUpkeepTimer == NULL )
1864        m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
1865}
1866
1867void
1868tr_peerMgrStartTorrent( tr_torrent * tor )
1869{
1870    Torrent * t = tor->torrentPeers;
1871
1872    assert( t != NULL );
1873    managerLock( t->manager );
1874    ensureMgrTimersExist( t->manager );
1875
1876    t->isRunning = TRUE;
1877
1878    rechokePulse( t->manager );
1879    managerUnlock( t->manager );
1880}
1881
1882static void
1883stopTorrent( Torrent * t )
1884{
1885    int i, n;
1886
1887    assert( torrentIsLocked( t ) );
1888
1889    t->isRunning = FALSE;
1890
1891    /* disconnect the peers. */
1892    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
1893        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
1894    tr_ptrArrayClear( &t->peers );
1895
1896    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1897     * which removes the handshake from t->outgoingHandshakes... */
1898    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1899        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1900}
1901
1902void
1903tr_peerMgrStopTorrent( tr_torrent * tor )
1904{
1905    Torrent * t = tor->torrentPeers;
1906
1907    managerLock( t->manager );
1908
1909    stopTorrent( t );
1910
1911    managerUnlock( t->manager );
1912}
1913
1914void
1915tr_peerMgrAddTorrent( tr_peerMgr * manager,
1916                      tr_torrent * tor )
1917{
1918    managerLock( manager );
1919
1920    assert( tor );
1921    assert( tor->torrentPeers == NULL );
1922
1923    tor->torrentPeers = torrentConstructor( manager, tor );
1924
1925    managerUnlock( manager );
1926}
1927
1928void
1929tr_peerMgrRemoveTorrent( tr_torrent * tor )
1930{
1931    tr_torrentLock( tor );
1932
1933    stopTorrent( tor->torrentPeers );
1934    torrentDestructor( tor->torrentPeers );
1935
1936    tr_torrentUnlock( tor );
1937}
1938
1939void
1940tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1941                               int8_t           * tab,
1942                               unsigned int       tabCount )
1943{
1944    tr_piece_index_t   i;
1945    const Torrent *    t;
1946    float              interval;
1947    tr_bool            isSeed;
1948    int                peerCount;
1949    const tr_peer **   peers;
1950    tr_torrentLock( tor );
1951
1952    t = tor->torrentPeers;
1953    tor = t->tor;
1954    interval = tor->info.pieceCount / (float)tabCount;
1955    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1956    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1957    peerCount = tr_ptrArraySize( &t->peers );
1958
1959    memset( tab, 0, tabCount );
1960
1961    for( i = 0; tor && i < tabCount; ++i )
1962    {
1963        const int piece = i * interval;
1964
1965        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1966            tab[i] = -1;
1967        else if( peerCount ) {
1968            int j;
1969            for( j = 0; j < peerCount; ++j )
1970                if( tr_bitsetHas( &peers[j]->have, i ) )
1971                    ++tab[i];
1972        }
1973    }
1974
1975    tr_torrentUnlock( tor );
1976}
1977
1978/* Returns the pieces that are available from peers */
1979tr_bitfield*
1980tr_peerMgrGetAvailable( const tr_torrent * tor )
1981{
1982    int i;
1983    int peerCount;
1984    Torrent * t = tor->torrentPeers;
1985    const tr_peer ** peers;
1986    tr_bitfield * pieces;
1987    managerLock( t->manager );
1988
1989    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1990    peerCount = tr_ptrArraySize( &t->peers );
1991    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1992    for( i=0; i<peerCount; ++i )
1993        tr_bitsetOr( pieces, &peers[i]->have );
1994
1995    managerUnlock( t->manager );
1996    return pieces;
1997}
1998
1999void
2000tr_peerMgrTorrentStats( tr_torrent       * tor,
2001                        int              * setmePeersKnown,
2002                        int              * setmePeersConnected,
2003                        int              * setmeSeedsConnected,
2004                        int              * setmeWebseedsSendingToUs,
2005                        int              * setmePeersSendingToUs,
2006                        int              * setmePeersGettingFromUs,
2007                        int              * setmePeersFrom )
2008{
2009    int i, size;
2010    const Torrent * t = tor->torrentPeers;
2011    const tr_peer ** peers;
2012    const tr_webseed ** webseeds;
2013
2014    managerLock( t->manager );
2015
2016    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2017    size = tr_ptrArraySize( &t->peers );
2018
2019    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2020    *setmePeersConnected       = 0;
2021    *setmeSeedsConnected       = 0;
2022    *setmePeersGettingFromUs   = 0;
2023    *setmePeersSendingToUs     = 0;
2024    *setmeWebseedsSendingToUs  = 0;
2025
2026    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2027        setmePeersFrom[i] = 0;
2028
2029    for( i=0; i<size; ++i )
2030    {
2031        const tr_peer * peer = peers[i];
2032        const struct peer_atom * atom = peer->atom;
2033
2034        if( peer->io == NULL ) /* not connected */
2035            continue;
2036
2037        ++*setmePeersConnected;
2038
2039        ++setmePeersFrom[atom->from];
2040
2041        if( clientIsDownloadingFrom( tor, peer ) )
2042            ++*setmePeersSendingToUs;
2043
2044        if( clientIsUploadingTo( peer ) )
2045            ++*setmePeersGettingFromUs;
2046
2047        if( atom->flags & ADDED_F_SEED_FLAG )
2048            ++*setmeSeedsConnected;
2049    }
2050
2051    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2052    size = tr_ptrArraySize( &t->webseeds );
2053    for( i=0; i<size; ++i )
2054        if( tr_webseedIsActive( webseeds[i] ) )
2055            ++*setmeWebseedsSendingToUs;
2056
2057    managerUnlock( t->manager );
2058}
2059
2060float
2061tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
2062{
2063    int i;
2064    float tmp;
2065    float ret = 0;
2066
2067    const Torrent * t = tor->torrentPeers;
2068    const int n = tr_ptrArraySize( &t->webseeds );
2069    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2070
2071    for( i=0; i<n; ++i )
2072        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
2073            ret += tmp;
2074
2075    return ret;
2076}
2077
2078
2079float*
2080tr_peerMgrWebSpeeds( const tr_torrent * tor )
2081{
2082    const Torrent * t = tor->torrentPeers;
2083    const tr_webseed ** webseeds;
2084    int i;
2085    int webseedCount;
2086    float * ret;
2087    uint64_t now;
2088
2089    assert( t->manager );
2090    managerLock( t->manager );
2091
2092    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2093    webseedCount = tr_ptrArraySize( &t->webseeds );
2094    assert( webseedCount == tor->info.webseedCount );
2095    ret = tr_new0( float, webseedCount );
2096    now = tr_date( );
2097
2098    for( i=0; i<webseedCount; ++i )
2099        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
2100            ret[i] = -1.0;
2101
2102    managerUnlock( t->manager );
2103    return ret;
2104}
2105
2106double
2107tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
2108{
2109    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
2110}
2111
2112
2113struct tr_peer_stat *
2114tr_peerMgrPeerStats( const tr_torrent    * tor,
2115                     int                 * setmeCount )
2116{
2117    int i, size;
2118    const Torrent * t = tor->torrentPeers;
2119    const tr_peer ** peers;
2120    tr_peer_stat * ret;
2121    uint64_t now;
2122
2123    assert( t->manager );
2124    managerLock( t->manager );
2125
2126    size = tr_ptrArraySize( &t->peers );
2127    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2128    ret = tr_new0( tr_peer_stat, size );
2129    now = tr_date( );
2130
2131    for( i=0; i<size; ++i )
2132    {
2133        char *                   pch;
2134        const tr_peer *          peer = peers[i];
2135        const struct peer_atom * atom = peer->atom;
2136        tr_peer_stat *           stat = ret + i;
2137
2138        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2139        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2140                   sizeof( stat->client ) );
2141        stat->port               = ntohs( peer->atom->port );
2142        stat->from               = atom->from;
2143        stat->progress           = peer->progress;
2144        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2145        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
2146        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
2147        stat->peerIsChoked       = peer->peerIsChoked;
2148        stat->peerIsInterested   = peer->peerIsInterested;
2149        stat->clientIsChoked     = peer->clientIsChoked;
2150        stat->clientIsInterested = peer->clientIsInterested;
2151        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
2152        stat->isDownloadingFrom  = clientIsDownloadingFrom( tor, peer );
2153        stat->isUploadingTo      = clientIsUploadingTo( peer );
2154        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2155
2156        pch = stat->flagStr;
2157        if( t->optimistic == peer ) *pch++ = 'O';
2158        if( stat->isDownloadingFrom ) *pch++ = 'D';
2159        else if( stat->clientIsInterested ) *pch++ = 'd';
2160        if( stat->isUploadingTo ) *pch++ = 'U';
2161        else if( stat->peerIsInterested ) *pch++ = 'u';
2162        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2163        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2164        if( stat->isEncrypted ) *pch++ = 'E';
2165        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2166        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2167        if( stat->isIncoming ) *pch++ = 'I';
2168        *pch = '\0';
2169    }
2170
2171    *setmeCount = size;
2172
2173    managerUnlock( t->manager );
2174    return ret;
2175}
2176
2177/**
2178***
2179**/
2180
2181struct ChokeData
2182{
2183    tr_bool         doUnchoke;
2184    tr_bool         isInterested;
2185    tr_bool         isChoked;
2186    int             rate;
2187    tr_peer *       peer;
2188};
2189
2190static int
2191compareChoke( const void * va,
2192              const void * vb )
2193{
2194    const struct ChokeData * a = va;
2195    const struct ChokeData * b = vb;
2196
2197    if( a->rate != b->rate ) /* prefer higher overall speeds */
2198        return a->rate > b->rate ? -1 : 1;
2199
2200    if( a->isChoked != b->isChoked ) /* prefer unchoked */
2201        return a->isChoked ? 1 : -1;
2202
2203    return 0;
2204}
2205
2206static int
2207isNew( const tr_peer * peer )
2208{
2209    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2210}
2211
2212static int
2213isSame( const tr_peer * peer )
2214{
2215    return peer && peer->client && strstr( peer->client, "Transmission" );
2216}
2217
2218/**
2219***
2220**/
2221
2222static void
2223rechokeTorrent( Torrent * t, const uint64_t now )
2224{
2225    int i, size, unchokedInterested;
2226    const int peerCount = tr_ptrArraySize( &t->peers );
2227    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2228    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2229    const tr_session * session = t->manager->session;
2230    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2231
2232    assert( torrentIsLocked( t ) );
2233
2234    /* sort the peers by preference and rate */
2235    for( i = 0, size = 0; i < peerCount; ++i )
2236    {
2237        tr_peer * peer = peers[i];
2238        struct peer_atom * atom = peer->atom;
2239
2240        if( peer->progress >= 1.0 ) /* choke all seeds */
2241        {
2242            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2243        }
2244        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2245        {
2246            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2247        }
2248        else if( chokeAll ) /* choke everyone if we're not uploading */
2249        {
2250            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2251        }
2252        else
2253        {
2254            struct ChokeData * n = &choke[size++];
2255            n->peer         = peer;
2256            n->isInterested = peer->peerIsInterested;
2257            n->isChoked     = peer->peerIsChoked;
2258            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2259        }
2260    }
2261
2262    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2263
2264    /**
2265     * Reciprocation and number of uploads capping is managed by unchoking
2266     * the N peers which have the best upload rate and are interested.
2267     * This maximizes the client's download rate. These N peers are
2268     * referred to as downloaders, because they are interested in downloading
2269     * from the client.
2270     *
2271     * Peers which have a better upload rate (as compared to the downloaders)
2272     * but aren't interested get unchoked. If they become interested, the
2273     * downloader with the worst upload rate gets choked. If a client has
2274     * a complete file, it uses its upload rate rather than its download
2275     * rate to decide which peers to unchoke.
2276     */
2277    unchokedInterested = 0;
2278    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2279        choke[i].doUnchoke = 1;
2280        if( choke[i].isInterested )
2281            ++unchokedInterested;
2282    }
2283
2284    /* optimistic unchoke */
2285    if( i < size )
2286    {
2287        int n;
2288        struct ChokeData * c;
2289        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2290
2291        for( ; i<size; ++i )
2292        {
2293            if( choke[i].isInterested )
2294            {
2295                const tr_peer * peer = choke[i].peer;
2296                int x = 1, y;
2297                if( isNew( peer ) ) x *= 3;
2298                if( isSame( peer ) ) x *= 3;
2299                for( y=0; y<x; ++y )
2300                    tr_ptrArrayAppend( &randPool, &choke[i] );
2301            }
2302        }
2303
2304        if(( n = tr_ptrArraySize( &randPool )))
2305        {
2306            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2307            c->doUnchoke = 1;
2308            t->optimistic = c->peer;
2309        }
2310
2311        tr_ptrArrayDestruct( &randPool, NULL );
2312    }
2313
2314    for( i=0; i<size; ++i )
2315        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2316
2317    /* cleanup */
2318    tr_free( choke );
2319}
2320
2321static int
2322rechokePulse( void * vmgr )
2323{
2324    uint64_t now;
2325    tr_torrent * tor = NULL;
2326    tr_peerMgr * mgr = vmgr;
2327    managerLock( mgr );
2328
2329    now = tr_date( );
2330    while(( tor = tr_torrentNext( mgr->session, tor )))
2331        if( tor->isRunning )
2332            rechokeTorrent( tor->torrentPeers, now );
2333
2334    managerUnlock( mgr );
2335    return TRUE;
2336}
2337
2338/***
2339****
2340****  Life and Death
2341****
2342***/
2343
2344typedef enum
2345{
2346    TR_CAN_KEEP,
2347    TR_CAN_CLOSE,
2348    TR_MUST_CLOSE,
2349}
2350tr_close_type_t;
2351
2352static tr_close_type_t
2353shouldPeerBeClosed( const Torrent    * t,
2354                    const tr_peer    * peer,
2355                    int                peerCount,
2356                    const time_t       now )
2357{
2358    const tr_torrent *       tor = t->tor;
2359    const struct peer_atom * atom = peer->atom;
2360
2361    /* if it's marked for purging, close it */
2362    if( peer->doPurge )
2363    {
2364        tordbg( t, "purging peer %s because its doPurge flag is set",
2365                tr_atomAddrStr( atom ) );
2366        return TR_MUST_CLOSE;
2367    }
2368
2369    /* if we're seeding and the peer has everything we have,
2370     * and enough time has passed for a pex exchange, then disconnect */
2371    if( tr_torrentIsSeed( tor ) )
2372    {
2373        int peerHasEverything;
2374        if( atom->flags & ADDED_F_SEED_FLAG )
2375            peerHasEverything = TRUE;
2376        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2377            peerHasEverything = FALSE;
2378        else {
2379            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2380            tr_bitsetDifference( tmp, &peer->have );
2381            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2382            tr_bitfieldFree( tmp );
2383        }
2384
2385        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2386        {
2387            tordbg( t, "purging peer %s because we're both seeds",
2388                    tr_atomAddrStr( atom ) );
2389            return TR_MUST_CLOSE;
2390        }
2391    }
2392
2393    /* disconnect if it's been too long since piece data has been transferred.
2394     * this is on a sliding scale based on number of available peers... */
2395    {
2396        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2397        /* if we have >= relaxIfFewerThan, strictness is 100%.
2398         * if we have zero connections, strictness is 0% */
2399        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2400                               ? 1.0
2401                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2402        const int lo = MIN_UPLOAD_IDLE_SECS;
2403        const int hi = MAX_UPLOAD_IDLE_SECS;
2404        const int limit = hi - ( ( hi - lo ) * strictness );
2405        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2406/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2407        if( idleTime > limit ) {
2408            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2409                       tr_atomAddrStr( atom ), idleTime );
2410            return TR_CAN_CLOSE;
2411        }
2412    }
2413
2414    return TR_CAN_KEEP;
2415}
2416
2417static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2418
2419static tr_peer **
2420getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2421{
2422    int i, peerCount, outsize;
2423    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2424    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2425
2426    assert( torrentIsLocked( t ) );
2427
2428    for( i = outsize = 0; i < peerCount; ++i )
2429        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2430            ret[outsize++] = peers[i];
2431
2432    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2433
2434    *setmeSize = outsize;
2435    return ret;
2436}
2437
2438static int
2439compareCandidates( const void * va, const void * vb )
2440{
2441    const struct peer_atom * a = *(const struct peer_atom**) va;
2442    const struct peer_atom * b = *(const struct peer_atom**) vb;
2443
2444    /* <Charles> Here we would probably want to try reconnecting to
2445     * peers that had most recently given us data. Lots of users have
2446     * trouble with resets due to their routers and/or ISPs. This way we
2447     * can quickly recover from an unwanted reset. So we sort
2448     * piece_data_time in descending order.
2449     */
2450
2451    if( a->piece_data_time != b->piece_data_time )
2452        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2453
2454    if( a->numFails != b->numFails )
2455        return a->numFails < b->numFails ? -1 : 1;
2456
2457    if( a->time != b->time )
2458        return a->time < b->time ? -1 : 1;
2459
2460    /* In order to avoid fragmenting the swarm, peers from trackers and
2461     * from the DHT should be preferred to peers from PEX. */
2462    if( a->from != b->from )
2463        return a->from < b->from ? -1 : 1;
2464
2465    return 0;
2466}
2467
2468static int
2469getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2470{
2471    int sec;
2472
2473    /* if we were recently connected to this peer and transferring piece
2474     * data, try to reconnect to them sooner rather that later -- we don't
2475     * want network troubles to get in the way of a good peer. */
2476    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2477        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2478
2479    /* don't allow reconnects more often than our minimum */
2480    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2481        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2482
2483    /* otherwise, the interval depends on how many times we've tried
2484     * and failed to connect to the peer */
2485    else switch( atom->numFails ) {
2486        case 0: sec = 0; break;
2487        case 1: sec = 5; break;
2488        case 2: sec = 2 * 60; break;
2489        case 3: sec = 15 * 60; break;
2490        case 4: sec = 30 * 60; break;
2491        case 5: sec = 60 * 60; break;
2492        default: sec = 120 * 60; break;
2493    }
2494
2495    return sec;
2496}
2497
2498static struct peer_atom **
2499getPeerCandidates( Torrent * t, const time_t now, int * setmeSize )
2500{
2501    int                 i, atomCount, retCount;
2502    struct peer_atom ** atoms;
2503    struct peer_atom ** ret;
2504    const int           seed = tr_torrentIsSeed( t->tor );
2505
2506    assert( torrentIsLocked( t ) );
2507
2508    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2509    ret = tr_new( struct peer_atom*, atomCount );
2510    for( i = retCount = 0; i < atomCount; ++i )
2511    {
2512        int                interval;
2513        struct peer_atom * atom = atoms[i];
2514
2515        /* peer fed us too much bad data ... we only keep it around
2516         * now to weed it out in case someone sends it to us via pex */
2517        if( atom->myflags & MYFLAG_BANNED )
2518            continue;
2519
2520        /* peer was unconnectable before, so we're not going to keep trying.
2521         * this is needs a separate flag from `banned', since if they try
2522         * to connect to us later, we'll let them in */
2523        if( atom->myflags & MYFLAG_UNREACHABLE )
2524            continue;
2525
2526        /* no need to connect if we're both seeds... */
2527        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2528                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2529            continue;
2530
2531        /* don't reconnect too often */
2532        interval = getReconnectIntervalSecs( atom, now );
2533        if( ( now - atom->time ) < interval )
2534        {
2535            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2536                    i, tr_atomAddrStr( atom ), interval );
2537            continue;
2538        }
2539
2540        /* Don't connect to peers in our blocklist */
2541        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2542            continue;
2543
2544        /* we don't need two connections to the same peer... */
2545        if( peerIsInUse( t, atom ) )
2546            continue;
2547
2548        ret[retCount++] = atom;
2549    }
2550
2551    if( retCount != 0 )
2552        qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2553    *setmeSize = retCount;
2554    return ret;
2555}
2556
2557static void
2558closePeer( Torrent * t, tr_peer * peer )
2559{
2560    struct peer_atom * atom;
2561
2562    assert( t != NULL );
2563    assert( peer != NULL );
2564
2565    atom = peer->atom;
2566
2567    /* if we transferred piece data, then they might be good peers,
2568       so reset their `numFails' weight to zero.  otherwise we connected
2569       to them fruitlessly, so mark it as another fail */
2570    if( atom->piece_data_time )
2571        atom->numFails = 0;
2572    else
2573        ++atom->numFails;
2574
2575    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2576    removePeer( t, peer );
2577}
2578
2579static void
2580reconnectTorrent( Torrent * t )
2581{
2582    static time_t prevTime = 0;
2583    static int    newConnectionsThisSecond = 0;
2584    const time_t  now = tr_time( );
2585
2586    if( prevTime != now )
2587    {
2588        prevTime = now;
2589        newConnectionsThisSecond = 0;
2590    }
2591
2592    if( !t->isRunning )
2593    {
2594        removeAllPeers( t );
2595    }
2596    else
2597    {
2598        int i;
2599        int mustCloseCount;
2600        int maxCandidates;
2601        struct tr_peer ** mustClose;
2602
2603        /* disconnect the really bad peers */
2604        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2605        for( i=0; i<mustCloseCount; ++i )
2606            closePeer( t, mustClose[i] );
2607        tr_free( mustClose );
2608
2609        /* decide how many peers can we try to add in this pass */
2610        maxCandidates = MAX_RECONNECTIONS_PER_PULSE;
2611        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2612        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2613
2614        /* select the best candidates, if they are requested */
2615        if( maxCandidates == 0 )
2616        {
2617            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2618                       "NO connection candidates needed, %d atoms, "
2619                       "max per pulse is %d",
2620                       t->tor->info.name, mustCloseCount,
2621                       tr_ptrArraySize( &t->pool ),
2622                       MAX_RECONNECTIONS_PER_PULSE );
2623
2624            tordbg( t, "maxCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, "
2625                       "getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2626                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2627                       maxCandidates, MAX_RECONNECTIONS_PER_PULSE,
2628                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2629                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2630        }
2631        else
2632        {
2633            int canCloseCount = 0;
2634            int candidateCount;
2635            struct peer_atom ** candidates;
2636
2637            candidates = getPeerCandidates( t, now, &candidateCount );
2638            maxCandidates = MIN( maxCandidates, candidateCount );
2639
2640            /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2641            if( maxCandidates != 0 )
2642            {
2643                struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, now, &canCloseCount );
2644                for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2645                   closePeer( t, canClose[i] );
2646                tr_free( canClose );
2647            }
2648
2649            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2650                       "%d can-close connections, %d connection candidates, "
2651                       "%d atoms, max per pulse is %d",
2652                       t->tor->info.name, mustCloseCount,
2653                       canCloseCount, candidateCount,
2654                       tr_ptrArraySize( &t->pool ), MAX_RECONNECTIONS_PER_PULSE );
2655
2656            tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2657                       " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2658                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2659                       candidateCount, MAX_RECONNECTIONS_PER_PULSE,
2660                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2661                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2662
2663            /* add some new ones */
2664            for( i=0; i<maxCandidates; ++i )
2665            {
2666                tr_peerMgr        * mgr = t->manager;
2667                struct peer_atom  * atom = candidates[i];
2668                tr_peerIo         * io;
2669
2670                tordbg( t, "Starting an OUTGOING connection with %s",
2671                        tr_atomAddrStr( atom ) );
2672
2673                io = tr_peerIoNewOutgoing( mgr->session,
2674                                           mgr->session->bandwidth,
2675                                           &atom->addr,
2676                                           atom->port,
2677                                           t->tor->info.hash );
2678
2679                if( io == NULL )
2680                {
2681                    tordbg( t, "peerIo not created; marking peer %s as unreachable",
2682                            tr_atomAddrStr( atom ) );
2683                    atom->myflags |= MYFLAG_UNREACHABLE;
2684                }
2685                else
2686                {
2687                    tr_handshake * handshake = tr_handshakeNew( io,
2688                                                                mgr->session->encryptionMode,
2689                                                                myHandshakeDoneCB,
2690                                                                mgr );
2691
2692                    assert( tr_peerIoGetTorrentHash( io ) );
2693
2694                    tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2695
2696                    ++newConnectionsThisSecond;
2697
2698                    tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2699                                             handshakeCompare );
2700                }
2701
2702                atom->time = now;
2703            }
2704            tr_free( candidates );
2705        }
2706    }
2707}
2708
2709struct peer_liveliness
2710{
2711    tr_peer * peer;
2712    void * clientData;
2713    time_t pieceDataTime;
2714    time_t time;
2715    int speed;
2716    tr_bool doPurge;
2717};
2718
2719static int
2720comparePeerLiveliness( const void * va, const void * vb )
2721{
2722    const struct peer_liveliness * a = va;
2723    const struct peer_liveliness * b = vb;
2724
2725    if( a->doPurge != b->doPurge )
2726        return a->doPurge ? 1 : -1;
2727
2728    if( a->speed != b->speed ) /* faster goes first */
2729        return a->speed > b->speed ? -1 : 1;
2730
2731    /* the one to give us data more recently goes first */
2732    if( a->pieceDataTime != b->pieceDataTime )
2733        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2734
2735    /* the one we connected to most recently goes first */
2736    if( a->time != b->time )
2737        return a->time > b->time ? -1 : 1;
2738
2739    return 0;
2740}
2741
2742static int
2743comparePeerLivelinessReverse( const void * va, const void * vb )
2744{
2745    return -comparePeerLiveliness (va, vb);
2746}
2747
2748static void
2749sortPeersByLivelinessImpl( tr_peer  ** peers,
2750                           void     ** clientData,
2751                           int         n,
2752                           uint64_t    now,
2753                           int (*compare) ( const void *va, const void *vb ) )
2754{
2755    int i;
2756    struct peer_liveliness *lives, *l;
2757
2758    /* build a sortable array of peer + extra info */
2759    lives = l = tr_new0( struct peer_liveliness, n );
2760    for( i=0; i<n; ++i, ++l )
2761    {
2762        tr_peer * p = peers[i];
2763        l->peer = p;
2764        l->doPurge = p->doPurge;
2765        l->pieceDataTime = p->atom->piece_data_time;
2766        l->time = p->atom->time;
2767        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2768                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2769        if( clientData )
2770            l->clientData = clientData[i];
2771    }
2772
2773    /* sort 'em */
2774    assert( n == ( l - lives ) );
2775    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2776
2777    /* build the peer array */
2778    for( i=0, l=lives; i<n; ++i, ++l ) {
2779        peers[i] = l->peer;
2780        if( clientData )
2781            clientData[i] = l->clientData;
2782    }
2783    assert( n == ( l - lives ) );
2784
2785    /* cleanup */
2786    tr_free( lives );
2787}
2788
2789static void
2790sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2791{
2792    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2793}
2794
2795static void
2796sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2797{
2798    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2799}
2800
2801
2802static void
2803enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2804{
2805    int n = tr_ptrArraySize( &t->peers );
2806    const int max = tr_torrentGetPeerLimit( t->tor );
2807    if( n > max )
2808    {
2809        void * base = tr_ptrArrayBase( &t->peers );
2810        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2811        sortPeersByLiveliness( peers, NULL, n, now );
2812        while( n > max )
2813            closePeer( t, peers[--n] );
2814        tr_free( peers );
2815    }
2816}
2817
2818static void
2819enforceSessionPeerLimit( tr_session * session, uint64_t now )
2820{
2821    int n = 0;
2822    tr_torrent * tor = NULL;
2823    const int max = tr_sessionGetPeerLimit( session );
2824
2825    /* count the total number of peers */
2826    while(( tor = tr_torrentNext( session, tor )))
2827        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2828
2829    /* if there are too many, prune out the worst */
2830    if( n > max )
2831    {
2832        tr_peer ** peers = tr_new( tr_peer*, n );
2833        Torrent ** torrents = tr_new( Torrent*, n );
2834
2835        /* populate the peer array */
2836        n = 0;
2837        tor = NULL;
2838        while(( tor = tr_torrentNext( session, tor ))) {
2839            int i;
2840            Torrent * t = tor->torrentPeers;
2841            const int tn = tr_ptrArraySize( &t->peers );
2842            for( i=0; i<tn; ++i, ++n ) {
2843                peers[n] = tr_ptrArrayNth( &t->peers, i );
2844                torrents[n] = t;
2845            }
2846        }
2847
2848        /* sort 'em */
2849        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2850
2851        /* cull out the crappiest */
2852        while( n-- > max )
2853            closePeer( torrents[n], peers[n] );
2854
2855        /* cleanup */
2856        tr_free( torrents );
2857        tr_free( peers );
2858    }
2859}
2860
2861
2862static int
2863reconnectPulse( void * vmgr )
2864{
2865    tr_torrent * tor;
2866    tr_peerMgr * mgr = vmgr;
2867    uint64_t now;
2868    managerLock( mgr );
2869
2870    now = tr_date( );
2871
2872    /* if we're over the per-torrent peer limits, cull some peers */
2873    tor = NULL;
2874    while(( tor = tr_torrentNext( mgr->session, tor )))
2875        if( tor->isRunning )
2876            enforceTorrentPeerLimit( tor->torrentPeers, now );
2877
2878    /* if we're over the per-session peer limits, cull some peers */
2879    enforceSessionPeerLimit( mgr->session, now );
2880
2881    tor = NULL;
2882    while(( tor = tr_torrentNext( mgr->session, tor )))
2883        if( tor->isRunning )
2884            reconnectTorrent( tor->torrentPeers );
2885
2886    managerUnlock( mgr );
2887    return TRUE;
2888}
2889
2890/****
2891*****
2892*****  BANDWIDTH ALLOCATION
2893*****
2894****/
2895
2896static void
2897pumpAllPeers( tr_peerMgr * mgr )
2898{
2899    tr_torrent * tor = NULL;
2900
2901    while(( tor = tr_torrentNext( mgr->session, tor )))
2902    {
2903        int j;
2904        Torrent * t = tor->torrentPeers;
2905
2906        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2907        {
2908            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2909            tr_peerMsgsPulse( peer->msgs );
2910        }
2911    }
2912}
2913
2914static int
2915bandwidthPulse( void * vmgr )
2916{
2917    tr_torrent * tor = NULL;
2918    tr_peerMgr * mgr = vmgr;
2919    managerLock( mgr );
2920
2921    /* FIXME: this next line probably isn't necessary... */
2922    pumpAllPeers( mgr );
2923
2924    /* allocate bandwidth to the peers */
2925    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2926    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2927
2928    /* possibly stop torrents that have seeded enough */
2929    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2930        if( tor->needsSeedRatioCheck ) {
2931            tor->needsSeedRatioCheck = FALSE;
2932            tr_torrentCheckSeedRatio( tor );
2933        }
2934    }
2935
2936    /* run the completeness check for any torrents that need it */
2937    tor = NULL;
2938    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2939        if( tor->torrentPeers->needsCompletenessCheck ) {
2940            tor->torrentPeers->needsCompletenessCheck  = FALSE;
2941            tr_torrentRecheckCompleteness( tor );
2942        }
2943    }
2944
2945    /* possibly stop torrents that have an error */
2946    tor = NULL;
2947    while(( tor = tr_torrentNext( mgr->session, tor )))
2948        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
2949            tr_torrentStop( tor );
2950
2951    managerUnlock( mgr );
2952    return TRUE;
2953}
2954
2955/***
2956****
2957***/
2958
2959static int
2960compareAtomPtrsByAddress( const void * va, const void *vb )
2961{
2962    const struct peer_atom * a = * (const struct peer_atom**) va;
2963    const struct peer_atom * b = * (const struct peer_atom**) vb;
2964
2965    assert( tr_isAtom( a ) );
2966    assert( tr_isAtom( b ) );
2967
2968    return tr_compareAddresses( &a->addr, &b->addr );
2969}
2970
2971static time_t tr_now = 0;
2972
2973/* best come first, worst go last */
2974static int
2975compareAtomPtrsByShelfDate( const void * va, const void *vb )
2976{
2977    time_t atime;
2978    time_t btime;
2979    const struct peer_atom * a = * (const struct peer_atom**) va;
2980    const struct peer_atom * b = * (const struct peer_atom**) vb;
2981    const int data_time_cutoff_secs = 60 * 60;
2982
2983    assert( tr_isAtom( a ) );
2984    assert( tr_isAtom( b ) );
2985
2986    /* primary key: the last piece data time *if* it was within the last hour */
2987    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
2988    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
2989    if( atime != btime )
2990        return atime > btime ? -1 : 1;
2991
2992    /* secondary key: shelf date. */
2993    if( a->shelf_date != b->shelf_date )
2994        return a->shelf_date > b->shelf_date ? -1 : 1;
2995
2996    return 0;
2997}
2998
2999static int
3000getMaxAtomCount( const tr_torrent * tor )
3001{
3002    /* FIXME: this curve should be smoother... */
3003    const int n = tor->maxConnectedPeers;
3004    if( n >= 200 ) return n * 1.5;
3005    if( n >= 100 ) return n * 2;
3006    if( n >=  50 ) return n * 3;
3007    if( n >=  20 ) return n * 5;
3008    return n * 10;
3009}
3010
3011static int
3012atomPulse( void * vmgr )
3013{
3014    tr_torrent * tor = NULL;
3015    tr_peerMgr * mgr = vmgr;
3016    managerLock( mgr );
3017
3018    while(( tor = tr_torrentNext( mgr->session, tor )))
3019    {
3020        int atomCount;
3021        Torrent * t = tor->torrentPeers;
3022        const int maxAtomCount = getMaxAtomCount( tor );
3023        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3024
3025        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3026        {
3027            int i;
3028            int keepCount = 0;
3029            int testCount = 0;
3030            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3031            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3032
3033            /* keep the ones that are in use */
3034            for( i=0; i<atomCount; ++i ) {
3035                struct peer_atom * atom = atoms[i];
3036                if( peerIsInUse( t, atom ) )
3037                    keep[keepCount++] = atom;
3038                else
3039                    test[testCount++] = atom;
3040            }
3041
3042            /* if there's room, keep the best of what's left */
3043            i = 0;
3044            if( keepCount < maxAtomCount ) {
3045                tr_now = tr_time( );
3046                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3047                while( i<testCount && keepCount<maxAtomCount )
3048                    keep[keepCount++] = test[i++];
3049            }
3050
3051            /* free the culled atoms */
3052            while( i<testCount )
3053                tr_free( test[i++] );
3054
3055            /* rebuild Torrent.pool with what's left */
3056            tr_ptrArrayDestruct( &t->pool, NULL );
3057            t->pool = TR_PTR_ARRAY_INIT;
3058            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3059            for( i=0; i<keepCount; ++i )
3060                tr_ptrArrayAppend( &t->pool, keep[i] );
3061
3062            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3063
3064            /* cleanup */
3065            tr_free( test );
3066            tr_free( keep );
3067        }
3068    }
3069
3070    managerUnlock( mgr );
3071    return TRUE;
3072}
Note: See TracBrowser for help on using the repository browser.