source: trunk/libtransmission/peer-mgr.c @ 10085

Last change on this file since 10085 was 10085, checked in by charles, 12 years ago

(trunk libT) taper off the number of new connections per pulse per torrent based on how long the torrent's been running. Brand-new torrents get a higher burst of new peer connection attempts, but long-running torrents don't need that kind of activity.

  • Property svn:keywords set to Date Rev Author Id
File size: 93.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 10085 2010-02-03 00:12:19Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16
17#include <event.h>
18
19#include "transmission.h"
20#include "announcer.h"
21#include "bandwidth.h"
22#include "bencode.h"
23#include "blocklist.h"
24#include "clients.h"
25#include "completion.h"
26#include "crypto.h"
27#include "handshake.h"
28#include "inout.h" /* tr_ioTestPiece */
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "session.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "utils.h"
38#include "webseed.h"
39
40enum
41{
42    /* how frequently to cull old atoms */
43    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
44
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* how frequently to reallocate bandwidth */
49    BANDWIDTH_PERIOD_MSEC = 500,
50
51    /* how frequently to age out old piece request lists */
52    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
53
54    /* how frequently to decide which peers live and die */
55    RECONNECT_PERIOD_MSEC = 500,
56
57    /* when many peers are available, keep idle ones this long */
58    MIN_UPLOAD_IDLE_SECS = ( 60 ),
59
60    /* when few peers are available, keep idle ones this long */
61    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
62
63    /* max number of peers to ask for per second overall.
64    * this throttle is to avoid overloading the router */
65    MAX_CONNECTIONS_PER_SECOND = 16,
66
67    /* number of bad pieces a peer is allowed to send before we ban them */
68    MAX_BAD_PIECES_PER_PEER = 5,
69
70    /* amount of time to keep a list of request pieces lying around
71       before it's considered too old and needs to be rebuilt */
72    PIECE_LIST_SHELF_LIFE_SECS = 60,
73
74    /* use for bitwise operations w/peer_atom.myflags */
75    MYFLAG_BANNED = 1,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    /* unreachable for now... but not banned.
79     * if they try to connect to us it's okay */
80    MYFLAG_UNREACHABLE = 2,
81
82    /* the minimum we'll wait before attempting to reconnect to a peer */
83    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
84
85    /** how long we'll let requests we've made linger before we cancel them */
86    REQUEST_TTL_SECS = 45
87};
88
89
90/**
91***
92**/
93
94enum
95{
96    UPLOAD_ONLY_UKNOWN,
97    UPLOAD_ONLY_YES,
98    UPLOAD_ONLY_NO
99};
100
101/**
102 * Peer information that should be kept even before we've connected and
103 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
104 * which ones would make good candidates for connecting to, and to watch out
105 * for banned peers.
106 *
107 * @see tr_peer
108 * @see tr_peermsgs
109 */
110struct peer_atom
111{
112    tr_peer   * peer;        /* will be NULL if not connected */
113    uint8_t     from;
114    uint8_t     flags;       /* these match the added_f flags */
115    uint8_t     myflags;     /* flags that aren't defined in added_f */
116    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
117    tr_port     port;
118    uint16_t    numFails;
119    tr_address  addr;
120    time_t      time;        /* when the peer's connection status last changed */
121    time_t      piece_data_time;
122
123    /* similar to a TTL field, but less rigid --
124     * if the swarm is small, the atom will be kept past this date. */
125    time_t      shelf_date;
126};
127
128#ifdef NDEBUG
129#define tr_isAtom(a) (TRUE)
130#else
131static tr_bool
132tr_isAtom( const struct peer_atom * atom )
133{
134    return ( atom != NULL )
135        && ( atom->from < TR_PEER_FROM__MAX )
136        && ( tr_isAddress( &atom->addr ) );
137}
138#endif
139
140static const char*
141tr_atomAddrStr( const struct peer_atom * atom )
142{
143    return tr_peerIoAddrStr( &atom->addr, atom->port );
144}
145
146struct block_request
147{
148    tr_block_index_t block;
149    tr_peer * peer;
150    time_t sentAt;
151};
152
153struct weighted_piece
154{
155    tr_piece_index_t index;
156    int16_t salt;
157    int16_t requestCount;
158};
159
160/** @brief Opaque, per-torrent data structure for peer connection information */
161typedef struct tr_torrent_peers
162{
163    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
164    tr_ptrArray                pool; /* struct peer_atom */
165    tr_ptrArray                peers; /* tr_peer */
166    tr_ptrArray                webseeds; /* tr_webseed */
167
168    tr_torrent               * tor;
169    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
170    struct tr_peerMgr        * manager;
171
172    tr_bool                    isRunning;
173    tr_bool                    needsCompletenessCheck;
174
175    struct block_request     * requests;
176    int                        requestsSort;
177    int                        requestCount;
178    int                        requestAlloc;
179
180    struct weighted_piece    * pieces;
181    int                        pieceCount;
182}
183Torrent;
184
185struct tr_peerMgr
186{
187    tr_session    * session;
188    tr_ptrArray     incomingHandshakes; /* tr_handshake */
189    struct event  * bandwidthTimer;
190    struct event  * rechokeTimer;
191    struct event  * reconnectTimer;
192    struct event  * refillUpkeepTimer;
193    struct event  * atomTimer;
194};
195
196#define tordbg( t, ... ) \
197    do { \
198        if( tr_deepLoggingIsActive( ) ) \
199            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
200    } while( 0 )
201
202#define dbgmsg( ... ) \
203    do { \
204        if( tr_deepLoggingIsActive( ) ) \
205            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
206    } while( 0 )
207
208/**
209***
210**/
211
212static inline void
213managerLock( const struct tr_peerMgr * manager )
214{
215    tr_sessionLock( manager->session );
216}
217
218static inline void
219managerUnlock( const struct tr_peerMgr * manager )
220{
221    tr_sessionUnlock( manager->session );
222}
223
224static inline void
225torrentLock( Torrent * torrent )
226{
227    managerLock( torrent->manager );
228}
229
230static inline void
231torrentUnlock( Torrent * torrent )
232{
233    managerUnlock( torrent->manager );
234}
235
236static inline int
237torrentIsLocked( const Torrent * t )
238{
239    return tr_sessionIsLocked( t->manager->session );
240}
241
242/**
243***
244**/
245
246static int
247handshakeCompareToAddr( const void * va, const void * vb )
248{
249    const tr_handshake * a = va;
250
251    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
252}
253
254static int
255handshakeCompare( const void * a, const void * b )
256{
257    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
258}
259
260static tr_handshake*
261getExistingHandshake( tr_ptrArray      * handshakes,
262                      const tr_address * addr )
263{
264    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
265}
266
267static int
268comparePeerAtomToAddress( const void * va, const void * vb )
269{
270    const struct peer_atom * a = va;
271
272    return tr_compareAddresses( &a->addr, vb );
273}
274
275static int
276compareAtomsByAddress( const void * va, const void * vb )
277{
278    const struct peer_atom * b = vb;
279
280    assert( tr_isAtom( b ) );
281
282    return comparePeerAtomToAddress( va, &b->addr );
283}
284
285/**
286***
287**/
288
289const tr_address *
290tr_peerAddress( const tr_peer * peer )
291{
292    return &peer->atom->addr;
293}
294
295static Torrent*
296getExistingTorrent( tr_peerMgr *    manager,
297                    const uint8_t * hash )
298{
299    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
300
301    return tor == NULL ? NULL : tor->torrentPeers;
302}
303
304static int
305peerCompare( const void * a, const void * b )
306{
307    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
308}
309
310static struct peer_atom*
311getExistingAtom( const Torrent    * t,
312                 const tr_address * addr )
313{
314    Torrent * tt = (Torrent*)t;
315    assert( torrentIsLocked( t ) );
316    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
317}
318
319static tr_bool
320peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
321{
322    Torrent * t = (Torrent*) ct;
323
324    assert( torrentIsLocked ( t ) );
325
326    return ( atom->peer != NULL )
327        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
328        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
329}
330
331static tr_peer*
332peerConstructor( struct peer_atom * atom )
333{
334    tr_peer * peer = tr_new0( tr_peer, 1 );
335    tr_bitsetConstructor( &peer->have, 0 );
336    peer->atom = atom;
337    atom->peer = peer;
338    return peer;
339}
340
341static tr_peer*
342getPeer( Torrent * torrent, struct peer_atom * atom )
343{
344    tr_peer * peer;
345
346    assert( torrentIsLocked( torrent ) );
347
348    peer = atom->peer;
349
350    if( peer == NULL )
351    {
352        peer = peerConstructor( atom );
353        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
354    }
355
356    return peer;
357}
358
359static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
360
361static void
362peerDestructor( Torrent * t, tr_peer * peer )
363{
364    assert( peer != NULL );
365
366    peerDeclinedAllRequests( t, peer );
367
368    if( peer->msgs != NULL )
369    {
370        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
371        tr_peerMsgsFree( peer->msgs );
372    }
373
374    tr_peerIoClear( peer->io );
375    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
376
377    tr_bitsetDestructor( &peer->have );
378    tr_bitfieldFree( peer->blame );
379    tr_free( peer->client );
380    peer->atom->peer = NULL;
381
382    tr_free( peer );
383}
384
385static void
386removePeer( Torrent * t, tr_peer * peer )
387{
388    tr_peer * removed;
389    struct peer_atom * atom = peer->atom;
390
391    assert( torrentIsLocked( t ) );
392    assert( atom );
393
394    atom->time = tr_time( );
395
396    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
397    assert( removed == peer );
398    peerDestructor( t, removed );
399}
400
401static void
402removeAllPeers( Torrent * t )
403{
404    while( !tr_ptrArrayEmpty( &t->peers ) )
405        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
406}
407
408static void
409torrentDestructor( void * vt )
410{
411    Torrent * t = vt;
412
413    assert( t );
414    assert( !t->isRunning );
415    assert( torrentIsLocked( t ) );
416    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
417    assert( tr_ptrArrayEmpty( &t->peers ) );
418
419    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
420    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
421    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
422    tr_ptrArrayDestruct( &t->peers, NULL );
423
424    tr_free( t->requests );
425    tr_free( t->pieces );
426    tr_free( t );
427}
428
429static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
430
431static Torrent*
432torrentConstructor( tr_peerMgr * manager,
433                    tr_torrent * tor )
434{
435    int       i;
436    Torrent * t;
437
438    t = tr_new0( Torrent, 1 );
439    t->manager = manager;
440    t->tor = tor;
441    t->pool = TR_PTR_ARRAY_INIT;
442    t->peers = TR_PTR_ARRAY_INIT;
443    t->webseeds = TR_PTR_ARRAY_INIT;
444    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
445    t->requests = 0;
446
447    for( i = 0; i < tor->info.webseedCount; ++i )
448    {
449        tr_webseed * w =
450            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
451        tr_ptrArrayAppend( &t->webseeds, w );
452    }
453
454    return t;
455}
456
457tr_peerMgr*
458tr_peerMgrNew( tr_session * session )
459{
460    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
461    m->session = session;
462    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
463    return m;
464}
465
466static void
467deleteTimer( struct event ** t )
468{
469    if( *t != NULL )
470    {
471        evtimer_del( *t );
472        tr_free( *t );
473        *t = NULL;
474    }
475}
476
477static void
478deleteTimers( struct tr_peerMgr * m )
479{
480    deleteTimer( &m->atomTimer );
481    deleteTimer( &m->bandwidthTimer );
482    deleteTimer( &m->rechokeTimer );
483    deleteTimer( &m->reconnectTimer );
484    deleteTimer( &m->refillUpkeepTimer );
485}
486
487void
488tr_peerMgrFree( tr_peerMgr * manager )
489{
490    managerLock( manager );
491
492    deleteTimers( manager );
493
494    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
495     * the item from manager->handshakes, so this is a little roundabout... */
496    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
497        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
498
499    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
500
501    managerUnlock( manager );
502    tr_free( manager );
503}
504
505static int
506clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
507{
508    if( !tr_torrentHasMetadata( tor ) )
509        return TRUE;
510
511    return peer->clientIsInterested && !peer->clientIsChoked;
512}
513
514static int
515clientIsUploadingTo( const tr_peer * peer )
516{
517    return peer->peerIsInterested && !peer->peerIsChoked;
518}
519
520/***
521****
522***/
523
524tr_bool
525tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
526                      const tr_address  * addr )
527{
528    tr_bool isSeed = FALSE;
529    const Torrent * t = tor->torrentPeers;
530    const struct peer_atom * atom = getExistingAtom( t, addr );
531
532    if( atom )
533        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
534
535    return isSeed;
536}
537
538/**
539***  REQUESTS
540***
541*** There are two data structures associated with managing block requests:
542***
543*** 1. Torrent::requests, an array of "struct block_request" which keeps
544***    track of which blocks have been requested, and when, and by which peers.
545***    This is list is used for (a) cancelling requests that have been pending
546***    for too long and (b) avoiding duplicate requests before endgame.
547***
548*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
549***    pieces that we want to request.  It's used to decide which pieces to
550***    return next when tr_peerMgrGetBlockRequests() is called.
551**/
552
553/**
554*** struct block_request
555**/
556
557enum
558{
559    REQ_UNSORTED,
560    REQ_SORTED_BY_BLOCK,
561    REQ_SORTED_BY_TIME
562};
563
564static int
565compareReqByBlock( const void * va, const void * vb )
566{
567    const struct block_request * a = va;
568    const struct block_request * b = vb;
569
570    /* primary key: block */
571    if( a->block < b->block ) return -1;
572    if( a->block > b->block ) return 1;
573
574    /* secondary key: peer */
575    if( a->peer < b->peer ) return -1;
576    if( a->peer > b->peer ) return 1;
577
578    return 0;
579}
580
581static int
582compareReqByTime( const void * va, const void * vb )
583{
584    const struct block_request * a = va;
585    const struct block_request * b = vb;
586
587    /* primary key: time */
588    if( a->sentAt < b->sentAt ) return -1;
589    if( a->sentAt > b->sentAt ) return 1;
590
591    /* secondary key: peer */
592    if( a->peer < b->peer ) return -1;
593    if( a->peer > b->peer ) return 1;
594
595    return 0;
596}
597
598static void
599requestListSort( Torrent * t, int mode )
600{
601    assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME );
602
603    if( t->requestsSort != mode )
604    {
605        int(*compar)(const void *, const void *);
606
607        t->requestsSort = mode;
608
609        switch( mode ) {
610            case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break;
611            case REQ_SORTED_BY_TIME: compar = compareReqByTime; break;
612            default: assert( 0 && "unhandled" );
613        }
614
615        qsort( t->requests, t->requestCount,
616               sizeof( struct block_request ), compar );
617    }
618}
619
620static void
621requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
622{
623    struct block_request key;
624
625    /* ensure enough room is available... */
626    if( t->requestCount + 1 >= t->requestAlloc )
627    {
628        const int CHUNK_SIZE = 128;
629        t->requestAlloc += CHUNK_SIZE;
630        t->requests = tr_renew( struct block_request,
631                                t->requests, t->requestAlloc );
632    }
633
634    /* populate the record we're inserting */
635    key.block = block;
636    key.peer = peer;
637    key.sentAt = tr_time( );
638
639    /* insert the request to our array... */
640    switch( t->requestsSort )
641    {
642        case REQ_UNSORTED:
643        case REQ_SORTED_BY_TIME:
644            t->requests[t->requestCount++] = key;
645            break;
646
647        case REQ_SORTED_BY_BLOCK: {
648            tr_bool exact;
649            const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
650                                           sizeof( struct block_request ),
651                                           compareReqByBlock, &exact );
652            assert( !exact );
653            memmove( t->requests + pos + 1,
654                     t->requests + pos,
655                     sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
656            t->requests[pos] = key;
657            break;
658        }
659    }
660
661    if( peer != NULL )
662    {
663        ++peer->pendingReqsToPeer;
664        assert( peer->pendingReqsToPeer >= 0 );
665    }
666
667    /*fprintf( stderr, "added request of block %lu from peer %s... "
668                       "there are now %d block\n",
669                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
670}
671
672static struct block_request *
673requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
674{
675    struct block_request key;
676    key.block = block;
677    key.peer = (tr_peer*) peer;
678
679    requestListSort( t, REQ_SORTED_BY_BLOCK );
680
681    return bsearch( &key, t->requests, t->requestCount,
682                    sizeof( struct block_request ),
683                    compareReqByBlock );
684}
685
686/* how many peers are we currently requesting this block from... */
687static int
688countBlockRequests( Torrent * t, tr_block_index_t block )
689{
690    tr_bool exact;
691    int i, n, pos;
692    struct block_request key;
693
694    requestListSort( t, REQ_SORTED_BY_BLOCK );
695    key.block = block;
696    key.peer = NULL;
697    pos = tr_lowerBound( &key, t->requests, t->requestCount,
698                         sizeof( struct block_request ),
699                         compareReqByBlock, &exact );
700
701    assert( !exact ); /* shouldn't have a request with .peer == NULL */
702
703    n = 0;
704    for( i=pos; i<t->requestCount; ++i ) {
705        if( t->requests[i].block == block )
706            ++n;
707        else
708            break;
709    }
710
711    return n;
712}
713
714static void
715decrementPendingReqCount( const struct block_request * b )
716{
717    if( b->peer != NULL )
718        if( b->peer->pendingReqsToPeer > 0 )
719            --b->peer->pendingReqsToPeer;
720}
721
722static void
723requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
724{
725    const struct block_request * b = requestListLookup( t, block, peer );
726    if( b != NULL )
727    {
728        const int pos = b - t->requests;
729        assert( pos < t->requestCount );
730
731        decrementPendingReqCount( b );
732
733        memmove( t->requests + pos,
734                 t->requests + pos + 1,
735                 sizeof( struct block_request ) * ( --t->requestCount - pos ) );
736        /*fprintf( stderr, "removing request of block %lu from peer %s... "
737                           "there are now %d block requests left\n",
738                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
739    }
740}
741
742/**
743*** struct weighted_piece
744**/
745
746enum
747{
748    PIECES_UNSORTED,
749    PIECES_SORTED_BY_INDEX,
750    PIECES_SORTED_BY_WEIGHT
751};
752
753const tr_torrent * weightTorrent;
754
755/* we try to create a "weight" s.t. high-priority pieces come before others,
756 * and that partially-complete pieces come before empty ones. */
757static int
758comparePieceByWeight( const void * va, const void * vb )
759{
760    const struct weighted_piece * a = va;
761    const struct weighted_piece * b = vb;
762    int ia, ib, missing, pending;
763    const tr_torrent * tor = weightTorrent;
764
765    /* primary key: weight */
766    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
767    pending = a->requestCount;
768    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
769    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
770    pending = b->requestCount;
771    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
772    if( ia < ib ) return -1;
773    if( ia > ib ) return 1;
774
775    /* secondary key: higher priorities go first */
776    ia = tor->info.pieces[a->index].priority;
777    ib = tor->info.pieces[b->index].priority;
778    if( ia > ib ) return -1;
779    if( ia < ib ) return 1;
780
781    /* tertiary key: random */
782    if( a->salt < b->salt ) return -1;
783    if( a->salt > b->salt ) return 1;
784
785    /* okay, they're equal */
786    return 0;
787}
788
789static int
790comparePieceByIndex( const void * va, const void * vb )
791{
792    const struct weighted_piece * a = va;
793    const struct weighted_piece * b = vb;
794    if( a->index < b->index ) return -1;
795    if( a->index > b->index ) return 1;
796    return 0;
797}
798
799static void
800pieceListSort( Torrent * t, int mode )
801{
802    int(*compar)(const void *, const void *);
803
804    assert( mode==PIECES_SORTED_BY_INDEX
805         || mode==PIECES_SORTED_BY_WEIGHT );
806
807    switch( mode ) {
808        case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
809        case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
810        default: assert( 0 && "unhandled" );  break;
811    }
812
813    weightTorrent = t->tor;
814    qsort( t->pieces, t->pieceCount,
815           sizeof( struct weighted_piece ), compar );
816}
817
818static tr_bool
819isInEndgame( Torrent * t )
820{
821    tr_bool endgame = TRUE;
822
823    if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
824    {
825        const tr_completion * cp = &t->tor->completion;
826        const struct weighted_piece * p = t->pieces;
827        const int pending = p->requestCount;
828        const int missing = tr_cpMissingBlocksInPiece( cp, p->index );
829        endgame = pending >= missing;
830    }
831
832    /*if( endgame ) fprintf( stderr, "ENDGAME reached\n" );*/
833    return endgame;
834}
835
836static struct weighted_piece *
837pieceListLookup( Torrent * t, tr_piece_index_t index )
838{
839    const struct weighted_piece * limit = t->pieces;
840    struct weighted_piece * piece = t->pieces + t->pieceCount - 1;
841
842    if ( t->pieceCount == 0 ) return NULL;
843
844    /* reverse linear search */
845    for( ;; )
846    { 
847        if( piece < limit ) return NULL;
848        if( index == piece->index ) return piece; else --piece;
849    }
850}
851
852static void
853pieceListRebuild( Torrent * t )
854{
855    if( !tr_torrentIsSeed( t->tor ) )
856    {
857        tr_piece_index_t i;
858        tr_piece_index_t * pool;
859        tr_piece_index_t poolCount = 0;
860        const tr_torrent * tor = t->tor;
861        const tr_info * inf = tr_torrentInfo( tor );
862        struct weighted_piece * pieces;
863        int pieceCount;
864
865        /* build the new list */
866        pool = tr_new( tr_piece_index_t, inf->pieceCount );
867        for( i=0; i<inf->pieceCount; ++i )
868            if( !inf->pieces[i].dnd )
869                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
870                    pool[poolCount++] = i;
871        pieceCount = poolCount;
872        pieces = tr_new0( struct weighted_piece, pieceCount );
873        for( i=0; i<poolCount; ++i ) {
874            struct weighted_piece * piece = pieces + i;
875            piece->index = pool[i];
876            piece->requestCount = 0;
877            piece->salt = tr_cryptoWeakRandInt( 4096 );
878        }
879
880        /* if we already had a list of pieces, merge it into
881         * the new list so we don't lose its requestCounts */
882        if( t->pieces != NULL )
883        {
884            struct weighted_piece * o = t->pieces;
885            struct weighted_piece * oend = o + t->pieceCount;
886            struct weighted_piece * n = pieces;
887            struct weighted_piece * nend = n + pieceCount;
888
889            pieceListSort( t, PIECES_SORTED_BY_INDEX );
890
891            while( o!=oend && n!=nend ) {
892                if( o->index < n->index )
893                    ++o;
894                else if( o->index > n->index )
895                    ++n;
896                else
897                    *n++ = *o++;
898            }
899
900            tr_free( t->pieces );
901        }
902
903        t->pieces = pieces;
904        t->pieceCount = pieceCount;
905
906        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
907
908        /* cleanup */
909        tr_free( pool );
910    }
911}
912
913static void
914pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
915{
916    struct weighted_piece * p = pieceListLookup( t, piece );
917
918    if( p != NULL )
919    {
920        const int pos = p - t->pieces;
921
922        memmove( t->pieces + pos,
923                 t->pieces + pos + 1,
924                 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
925
926        if( t->pieceCount == 0 )
927        {
928            tr_free( t->pieces );
929            t->pieces = NULL;
930        }
931    }
932}
933
934static void
935pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
936{
937    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
938    const struct weighted_piece * p = pieceListLookup( t, index );
939
940    if( p != NULL )
941    {
942        const int pos = p - t->pieces;
943        struct weighted_piece piece = t->pieces[pos];
944        int newpos;
945        tr_bool exact;
946
947        /* remove request */
948        if( piece.requestCount > 0 )
949            --piece.requestCount;
950
951        /* List is nearly sorted (by weight) : insert piece into the right place. */
952
953        weightTorrent = t->tor;
954        newpos = tr_lowerBound( &piece, t->pieces, t->pieceCount,
955                                sizeof( struct weighted_piece ),
956                                comparePieceByWeight, &exact );
957
958        if( newpos == pos || newpos == pos + 1 )
959        {
960            /* it's VERY likely that piece keeps its position */
961            t->pieces[pos].requestCount = piece.requestCount;
962        }
963        else
964        {
965            /* piece is removed temporally to make insertion easier */
966            memmove( &t->pieces[pos],
967                     &t->pieces[pos + 1],
968                     sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
969
970            if( newpos > pos ) --newpos;
971
972            memmove( &t->pieces[newpos + 1],
973                     &t->pieces[newpos],
974                     sizeof( struct weighted_piece ) * ( t->pieceCount++ - newpos ) );
975
976            t->pieces[newpos] = piece;
977        }
978    }
979}
980
981/**
982***
983**/
984
985void
986tr_peerMgrRebuildRequests( tr_torrent * tor )
987{
988    assert( tr_isTorrent( tor ) );
989
990    pieceListRebuild( tor->torrentPeers );
991}
992
993void
994tr_peerMgrGetNextRequests( tr_torrent           * tor,
995                           tr_peer              * peer,
996                           int                    numwant,
997                           tr_block_index_t     * setme,
998                           int                  * numgot )
999{
1000    int i;
1001    int got;
1002    Torrent * t;
1003    tr_bool endgame;
1004    struct weighted_piece * pieces;
1005    const tr_bitset * have = &peer->have;
1006
1007    /* sanity clause */
1008    assert( tr_isTorrent( tor ) );
1009    assert( numwant > 0 );
1010
1011    /* walk through the pieces and find blocks that should be requested */
1012    got = 0;
1013    t = tor->torrentPeers;
1014
1015    /* prep the pieces list */
1016    if( t->pieces == NULL )
1017        pieceListRebuild( t );
1018
1019    endgame = isInEndgame( t );
1020
1021    pieces = t->pieces;
1022    for( i=0; i<t->pieceCount && got<numwant; ++i )
1023    {
1024        struct weighted_piece * p = pieces + i;
1025        const int missing = tr_cpMissingBlocksInPiece( &tor->completion, p->index );
1026        const int maxDuplicatesPerBlock = endgame ? 3 : 1;
1027
1028        if( p->requestCount > ( missing * maxDuplicatesPerBlock ) )
1029            continue;
1030
1031        /* if the peer has this piece that we want... */
1032        if( tr_bitsetHasFast( have, p->index ) )
1033        {
1034            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1035            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1036
1037            for( ; b!=e && got<numwant; ++b )
1038            {
1039                /* don't request blocks we've already got */
1040                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1041                    continue;
1042
1043                /* don't send the same request to the same peer twice */
1044                if( tr_peerMgrDidPeerRequest( tor, peer, b ) )
1045                    continue;
1046
1047                /* don't send the same request to any peer too many times */
1048                if( countBlockRequests( t, b ) >= maxDuplicatesPerBlock )
1049                    continue;
1050
1051                /* update the caller's table */
1052                setme[got++] = b;
1053
1054                /* update our own tables */
1055                requestListAdd( t, b, peer );
1056                ++p->requestCount;
1057            }
1058        }
1059    }
1060
1061    /* In most cases we've just changed the weights of a small number of pieces.
1062     * So rather than qsort()ing the entire array, it's faster to apply an
1063     * adaptive insertion sort algorithm. */
1064    if( got > 0 )
1065    {
1066        /* not enough requests || last piece modified */
1067        if ( i == t->pieceCount ) --i;
1068
1069        weightTorrent = t->tor;
1070        while( --i >= 0 )
1071        {
1072            tr_bool exact;
1073
1074            /* relative position! */
1075            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1076                                              t->pieceCount - (i + 1),
1077                                              sizeof( struct weighted_piece ),
1078                                              comparePieceByWeight, &exact );
1079            if( newpos > 0 )
1080            {
1081                const struct weighted_piece piece = t->pieces[i];
1082                memmove( &t->pieces[i],
1083                         &t->pieces[i + 1],
1084                         sizeof( struct weighted_piece ) * ( newpos ) );
1085                t->pieces[i + newpos] = piece;
1086            }
1087        }
1088    }
1089
1090    *numgot = got;
1091}
1092
1093tr_bool
1094tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1095                          const tr_peer     * peer,
1096                          tr_block_index_t    block )
1097{
1098    const Torrent * t = tor->torrentPeers;
1099    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1100}
1101
1102/* cancel requests that are too old */
1103static void
1104refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1105{
1106    time_t now;
1107    time_t too_old;
1108    tr_torrent * tor;
1109    tr_peerMgr * mgr = vmgr;
1110    managerLock( mgr );
1111
1112    now = tr_time( );
1113    too_old = now - REQUEST_TTL_SECS;
1114
1115    tor = NULL;
1116    while(( tor = tr_torrentNext( mgr->session, tor )))
1117    {
1118        Torrent * t = tor->torrentPeers;
1119        const int n = t->requestCount;
1120        if( n > 0 )
1121        {
1122            int keepCount = 0;
1123            int cancelCount = 0;
1124            struct block_request * cancel = tr_new( struct block_request, n );
1125            const struct block_request * it;
1126            const struct block_request * end;
1127
1128            for( it=t->requests, end=it+n; it!=end; ++it )
1129            {
1130                if( it->sentAt <= too_old )
1131                    cancel[cancelCount++] = *it;
1132                else
1133                {
1134                    if( it != &t->requests[keepCount] )
1135                        t->requests[keepCount] = *it;
1136                    keepCount++;
1137                }
1138            }
1139
1140            /* prune out the ones we aren't keeping */
1141            t->requestCount = keepCount;
1142
1143            /* send cancel messages for all the "cancel" ones */
1144            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1145                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1146                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1147                    decrementPendingReqCount( it );
1148                }
1149            }
1150
1151            /* decrement the pending request counts for the timed-out blocks */
1152            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1153                pieceListRemoveRequest( t, it->block );
1154
1155            /* cleanup loop */
1156            tr_free( cancel );
1157        }
1158    }
1159
1160    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1161    managerUnlock( mgr );
1162}
1163
1164static void
1165addStrike( Torrent * t, tr_peer * peer )
1166{
1167    tordbg( t, "increasing peer %s strike count to %d",
1168            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1169
1170    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1171    {
1172        struct peer_atom * atom = peer->atom;
1173        atom->myflags |= MYFLAG_BANNED;
1174        peer->doPurge = 1;
1175        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1176    }
1177}
1178
1179static void
1180gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1181{
1182    tr_torrent *   tor = t->tor;
1183    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1184
1185    tor->corruptCur += byteCount;
1186    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1187}
1188
1189static void
1190peerSuggestedPiece( Torrent            * t UNUSED,
1191                    tr_peer            * peer UNUSED,
1192                    tr_piece_index_t     pieceIndex UNUSED,
1193                    int                  isFastAllowed UNUSED )
1194{
1195#if 0
1196    assert( t );
1197    assert( peer );
1198    assert( peer->msgs );
1199
1200    /* is this a valid piece? */
1201    if(  pieceIndex >= t->tor->info.pieceCount )
1202        return;
1203
1204    /* don't ask for it if we've already got it */
1205    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1206        return;
1207
1208    /* don't ask for it if they don't have it */
1209    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1210        return;
1211
1212    /* don't ask for it if we're choked and it's not fast */
1213    if( !isFastAllowed && peer->clientIsChoked )
1214        return;
1215
1216    /* request the blocks that we don't have in this piece */
1217    {
1218        tr_block_index_t block;
1219        const tr_torrent * tor = t->tor;
1220        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1221        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1222
1223        for( block=start; block<end; ++block )
1224        {
1225            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1226            {
1227                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1228                const uint32_t length = tr_torBlockCountBytes( tor, block );
1229                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1230                incrementPieceRequests( t, pieceIndex );
1231            }
1232        }
1233    }
1234#endif
1235}
1236
1237static void
1238decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount )
1239{
1240    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1241}
1242
1243static void
1244clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block )
1245{
1246    decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) );
1247}
1248
1249static void
1250removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1251{
1252    requestListRemove( t, block, peer );
1253    pieceListRemoveRequest( t, block );
1254}
1255
1256/* peer choked us, or maybe it disconnected.
1257   either way we need to remove all its requests */
1258static void
1259peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1260{
1261    int i, n;
1262    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1263
1264    for( i=n=0; i<t->requestCount; ++i )
1265        if( peer == t->requests[i].peer )
1266            blocks[n++] = t->requests[i].block;
1267
1268    for( i=0; i<n; ++i )
1269        removeRequestFromTables( t, blocks[i], peer );
1270
1271    tr_free( blocks );
1272}
1273
1274static void
1275peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1276{
1277    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1278    Torrent * t = vt;
1279    const tr_peer_event * e = vevent;
1280
1281    torrentLock( t );
1282
1283    switch( e->eventType )
1284    {
1285        case TR_PEER_UPLOAD_ONLY:
1286            /* update our atom */
1287            if( peer ) {
1288                if( e->uploadOnly ) {
1289                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1290                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1291                } else {
1292                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1293                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1294                }
1295            }
1296            break;
1297
1298        case TR_PEER_PEER_GOT_DATA:
1299        {
1300            const time_t now = tr_time( );
1301            tr_torrent * tor = t->tor;
1302
1303            tr_torrentSetActivityDate( tor, now );
1304
1305            if( e->wasPieceData ) {
1306                tor->uploadedCur += e->length;
1307                tr_torrentSetDirty( tor );
1308            }
1309
1310            /* update the stats */
1311            if( e->wasPieceData )
1312                tr_statsAddUploaded( tor->session, e->length );
1313
1314            /* update our atom */
1315            if( peer && e->wasPieceData )
1316                peer->atom->piece_data_time = now;
1317
1318            tor->needsSeedRatioCheck = TRUE;
1319
1320            break;
1321        }
1322
1323        case TR_PEER_CLIENT_GOT_REJ:
1324            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1325            break;
1326
1327        case TR_PEER_CLIENT_GOT_CHOKE:
1328            peerDeclinedAllRequests( t, peer );
1329            break;
1330
1331        case TR_PEER_CLIENT_GOT_PORT:
1332            if( peer )
1333                peer->atom->port = e->port;
1334            break;
1335
1336        case TR_PEER_CLIENT_GOT_SUGGEST:
1337            if( peer )
1338                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1339            break;
1340
1341        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1342            if( peer )
1343                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1344            break;
1345
1346        case TR_PEER_CLIENT_GOT_DATA:
1347        {
1348            const time_t now = tr_time( );
1349            tr_torrent * tor = t->tor;
1350
1351            tr_torrentSetActivityDate( tor, now );
1352
1353            /* only add this to downloadedCur if we got it from a peer --
1354             * webseeds shouldn't count against our ratio.  As one tracker
1355             * admin put it, "Those pieces are downloaded directly from the
1356             * content distributor, not the peers, it is the tracker's job
1357             * to manage the swarms, not the web server and does not fit
1358             * into the jurisdiction of the tracker." */
1359            if( peer && e->wasPieceData ) {
1360                tor->downloadedCur += e->length;
1361                tr_torrentSetDirty( tor );
1362            }
1363
1364            /* update the stats */
1365            if( e->wasPieceData )
1366                tr_statsAddDownloaded( tor->session, e->length );
1367
1368            /* update our atom */
1369            if( peer && e->wasPieceData )
1370                peer->atom->piece_data_time = now;
1371
1372            break;
1373        }
1374
1375        case TR_PEER_PEER_PROGRESS:
1376        {
1377            if( peer )
1378            {
1379                struct peer_atom * atom = peer->atom;
1380                if( e->progress >= 1.0 ) {
1381                    tordbg( t, "marking peer %s as a seed",
1382                            tr_atomAddrStr( atom ) );
1383                    atom->flags |= ADDED_F_SEED_FLAG;
1384                }
1385            }
1386            break;
1387        }
1388
1389        case TR_PEER_CLIENT_GOT_BLOCK:
1390        {
1391            tr_torrent * tor = t->tor;
1392            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1393
1394            requestListRemove( t, block, peer );
1395
1396            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1397            {
1398                tordbg( t, "we have this block already..." );
1399                clientGotUnwantedBlock( tor, block );
1400                pieceListRemoveRequest( t, block );
1401            }
1402            else
1403            {
1404                tr_cpBlockAdd( &tor->completion, block );
1405                pieceListRemoveRequest( t, block );
1406                tr_torrentSetDirty( tor );
1407
1408                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1409                {
1410                    const tr_piece_index_t p = e->pieceIndex;
1411                    const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1412
1413                    if( !ok )
1414                    {
1415                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1416                                   (unsigned long)p );
1417                    }
1418
1419                    tr_torrentSetHasPiece( tor, p, ok );
1420                    tr_torrentSetPieceChecked( tor, p, TRUE );
1421                    tr_peerMgrSetBlame( tor, p, ok );
1422
1423                    if( !ok )
1424                    {
1425                        gotBadPiece( t, p );
1426                    }
1427                    else
1428                    {
1429                        int i;
1430                        int peerCount;
1431                        tr_peer ** peers;
1432                        tr_file_index_t fileIndex;
1433
1434                        peerCount = tr_ptrArraySize( &t->peers );
1435                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1436                        for( i=0; i<peerCount; ++i )
1437                            tr_peerMsgsHave( peers[i]->msgs, p );
1438
1439                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1440                            const tr_file * file = &tor->info.files[fileIndex];
1441                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1442                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1443                                    tr_torrentFileCompleted( tor, fileIndex );
1444                        }
1445
1446                        pieceListRemovePiece( t, p );
1447                    }
1448                }
1449
1450                t->needsCompletenessCheck = TRUE;
1451            }
1452            break;
1453        }
1454
1455        case TR_PEER_ERROR:
1456            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1457            {
1458                /* some protocol error from the peer */
1459                peer->doPurge = 1;
1460                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1461                        tr_atomAddrStr( peer->atom ) );
1462            }
1463            else
1464            {
1465                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1466            }
1467            break;
1468
1469        default:
1470            assert( 0 );
1471    }
1472
1473    torrentUnlock( t );
1474}
1475
1476static int
1477getDefaultShelfLife( uint8_t from )
1478{
1479    /* in general, peers obtained from firsthand contact
1480     * are better than those from secondhand, etc etc */
1481    switch( from )
1482    {
1483        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1484        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1485        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1486        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1487        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1488        case TR_PEER_FROM_RESUME   : return 60 * 60;
1489        default                    : return 60 * 60;
1490    }
1491}
1492
1493
1494static void
1495ensureAtomExists( Torrent           * t,
1496                  const tr_address  * addr,
1497                  const tr_port       port,
1498                  const uint8_t       flags,
1499                  const uint8_t       from )
1500{
1501    assert( tr_isAddress( addr ) );
1502    assert( from < TR_PEER_FROM__MAX );
1503
1504    if( getExistingAtom( t, addr ) == NULL )
1505    {
1506        struct peer_atom * a;
1507        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1508
1509        a = tr_new0( struct peer_atom, 1 );
1510        a->addr = *addr;
1511        a->port = port;
1512        a->flags = flags;
1513        a->from = from;
1514        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1515        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1516
1517        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1518    }
1519}
1520
1521static int
1522getMaxPeerCount( const tr_torrent * tor )
1523{
1524    return tor->maxConnectedPeers;
1525}
1526
1527static int
1528getPeerCount( const Torrent * t )
1529{
1530    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1531}
1532
1533/* FIXME: this is kind of a mess. */
1534static tr_bool
1535myHandshakeDoneCB( tr_handshake  * handshake,
1536                   tr_peerIo     * io,
1537                   tr_bool         isConnected,
1538                   const uint8_t * peer_id,
1539                   void          * vmanager )
1540{
1541    tr_bool            ok = isConnected;
1542    tr_bool            success = FALSE;
1543    tr_port            port;
1544    const tr_address * addr;
1545    tr_peerMgr       * manager = vmanager;
1546    Torrent          * t;
1547    tr_handshake     * ours;
1548
1549    assert( io );
1550    assert( tr_isBool( ok ) );
1551
1552    t = tr_peerIoHasTorrentHash( io )
1553        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1554        : NULL;
1555
1556    if( tr_peerIoIsIncoming ( io ) )
1557        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1558                                        handshake, handshakeCompare );
1559    else if( t )
1560        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1561                                        handshake, handshakeCompare );
1562    else
1563        ours = handshake;
1564
1565    assert( ours );
1566    assert( ours == handshake );
1567
1568    if( t )
1569        torrentLock( t );
1570
1571    addr = tr_peerIoGetAddress( io, &port );
1572
1573    if( !ok || !t || !t->isRunning )
1574    {
1575        if( t )
1576        {
1577            struct peer_atom * atom = getExistingAtom( t, addr );
1578            if( atom )
1579                ++atom->numFails;
1580        }
1581    }
1582    else /* looking good */
1583    {
1584        struct peer_atom * atom;
1585
1586        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1587        atom = getExistingAtom( t, addr );
1588        atom->time = tr_time( );
1589        atom->piece_data_time = 0;
1590
1591        if( atom->myflags & MYFLAG_BANNED )
1592        {
1593            tordbg( t, "banned peer %s tried to reconnect",
1594                    tr_atomAddrStr( atom ) );
1595        }
1596        else if( tr_peerIoIsIncoming( io )
1597               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1598
1599        {
1600        }
1601        else
1602        {
1603            tr_peer * peer = atom->peer;
1604
1605            if( peer ) /* we already have this peer */
1606            {
1607            }
1608            else
1609            {
1610                peer = getPeer( t, atom );
1611                tr_free( peer->client );
1612
1613                if( !peer_id )
1614                    peer->client = NULL;
1615                else {
1616                    char client[128];
1617                    tr_clientForId( client, sizeof( client ), peer_id );
1618                    peer->client = tr_strdup( client );
1619                }
1620
1621                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1622                                                                balanced by our unref in peerDestructor()  */
1623                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1624                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1625
1626                success = TRUE;
1627            }
1628        }
1629    }
1630
1631    if( t )
1632        torrentUnlock( t );
1633
1634    return success;
1635}
1636
1637void
1638tr_peerMgrAddIncoming( tr_peerMgr * manager,
1639                       tr_address * addr,
1640                       tr_port      port,
1641                       int          socket )
1642{
1643    tr_session * session;
1644
1645    managerLock( manager );
1646
1647    assert( tr_isSession( manager->session ) );
1648    session = manager->session;
1649
1650    if( tr_sessionIsAddressBlocked( session, addr ) )
1651    {
1652        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1653        tr_netClose( session, socket );
1654    }
1655    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1656    {
1657        tr_netClose( session, socket );
1658    }
1659    else /* we don't have a connection to them yet... */
1660    {
1661        tr_peerIo *    io;
1662        tr_handshake * handshake;
1663
1664        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1665
1666        handshake = tr_handshakeNew( io,
1667                                     session->encryptionMode,
1668                                     myHandshakeDoneCB,
1669                                     manager );
1670
1671        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1672
1673        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1674                                 handshakeCompare );
1675    }
1676
1677    managerUnlock( manager );
1678}
1679
1680static tr_bool
1681tr_isPex( const tr_pex * pex )
1682{
1683    return pex && tr_isAddress( &pex->addr );
1684}
1685
1686void
1687tr_peerMgrAddPex( tr_torrent   *  tor,
1688                  uint8_t         from,
1689                  const tr_pex *  pex )
1690{
1691    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1692    {
1693        Torrent * t = tor->torrentPeers;
1694        managerLock( t->manager );
1695
1696        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1697            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1698                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1699
1700        managerUnlock( t->manager );
1701    }
1702}
1703
1704tr_pex *
1705tr_peerMgrCompactToPex( const void *    compact,
1706                        size_t          compactLen,
1707                        const uint8_t * added_f,
1708                        size_t          added_f_len,
1709                        size_t *        pexCount )
1710{
1711    size_t          i;
1712    size_t          n = compactLen / 6;
1713    const uint8_t * walk = compact;
1714    tr_pex *        pex = tr_new0( tr_pex, n );
1715
1716    for( i = 0; i < n; ++i )
1717    {
1718        pex[i].addr.type = TR_AF_INET;
1719        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1720        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1721        if( added_f && ( n == added_f_len ) )
1722            pex[i].flags = added_f[i];
1723    }
1724
1725    *pexCount = n;
1726    return pex;
1727}
1728
1729tr_pex *
1730tr_peerMgrCompact6ToPex( const void    * compact,
1731                         size_t          compactLen,
1732                         const uint8_t * added_f,
1733                         size_t          added_f_len,
1734                         size_t        * pexCount )
1735{
1736    size_t          i;
1737    size_t          n = compactLen / 18;
1738    const uint8_t * walk = compact;
1739    tr_pex *        pex = tr_new0( tr_pex, n );
1740
1741    for( i = 0; i < n; ++i )
1742    {
1743        pex[i].addr.type = TR_AF_INET6;
1744        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1745        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1746        if( added_f && ( n == added_f_len ) )
1747            pex[i].flags = added_f[i];
1748    }
1749
1750    *pexCount = n;
1751    return pex;
1752}
1753
1754tr_pex *
1755tr_peerMgrArrayToPex( const void * array,
1756                      size_t       arrayLen,
1757                      size_t      * pexCount )
1758{
1759    size_t          i;
1760    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1761    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1762    const uint8_t * walk = array;
1763    tr_pex        * pex = tr_new0( tr_pex, n );
1764
1765    for( i = 0 ; i < n ; i++ ) {
1766        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1767        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1768        pex[i].flags = 0x00;
1769        walk += sizeof( tr_address ) + 2;
1770    }
1771
1772    *pexCount = n;
1773    return pex;
1774}
1775
1776/**
1777***
1778**/
1779
1780void
1781tr_peerMgrSetBlame( tr_torrent     * tor,
1782                    tr_piece_index_t pieceIndex,
1783                    int              success )
1784{
1785    if( !success )
1786    {
1787        int        peerCount, i;
1788        Torrent *  t = tor->torrentPeers;
1789        tr_peer ** peers;
1790
1791        assert( torrentIsLocked( t ) );
1792
1793        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1794        for( i = 0; i < peerCount; ++i )
1795        {
1796            tr_peer * peer = peers[i];
1797            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1798            {
1799                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1800                        tr_atomAddrStr( peer->atom ),
1801                        pieceIndex, (int)peer->strikes + 1 );
1802                addStrike( t, peer );
1803            }
1804        }
1805    }
1806}
1807
1808int
1809tr_pexCompare( const void * va, const void * vb )
1810{
1811    const tr_pex * a = va;
1812    const tr_pex * b = vb;
1813    int i;
1814
1815    assert( tr_isPex( a ) );
1816    assert( tr_isPex( b ) );
1817
1818    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1819        return i;
1820
1821    if( a->port != b->port )
1822        return a->port < b->port ? -1 : 1;
1823
1824    return 0;
1825}
1826
1827#if 0
1828static int
1829peerPrefersCrypto( const tr_peer * peer )
1830{
1831    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1832        return TRUE;
1833
1834    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1835        return FALSE;
1836
1837    return tr_peerIoIsEncrypted( peer->io );
1838}
1839#endif
1840
1841/* better goes first */
1842static int
1843compareAtomsByUsefulness( const void * va, const void *vb )
1844{
1845    const struct peer_atom * a = * (const struct peer_atom**) va;
1846    const struct peer_atom * b = * (const struct peer_atom**) vb;
1847
1848    assert( tr_isAtom( a ) );
1849    assert( tr_isAtom( b ) );
1850
1851    if( a->piece_data_time != b->piece_data_time )
1852        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1853    if( a->from != b->from )
1854        return a->from < b->from ? -1 : 1;
1855    if( a->numFails != b->numFails )
1856        return a->numFails < b->numFails ? -1 : 1;
1857
1858    return 0;
1859}
1860
1861int
1862tr_peerMgrGetPeers( tr_torrent   * tor,
1863                    tr_pex      ** setme_pex,
1864                    uint8_t        af,
1865                    uint8_t        list_mode,
1866                    int            maxCount )
1867{
1868    int i;
1869    int n;
1870    int count = 0;
1871    int atomCount = 0;
1872    const Torrent * t = tor->torrentPeers;
1873    struct peer_atom ** atoms = NULL;
1874    tr_pex * pex;
1875    tr_pex * walk;
1876
1877    assert( tr_isTorrent( tor ) );
1878    assert( setme_pex != NULL );
1879    assert( af==TR_AF_INET || af==TR_AF_INET6 );
1880    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1881
1882    managerLock( t->manager );
1883
1884    /**
1885    ***  build a list of atoms
1886    **/
1887
1888    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1889    {
1890        int i;
1891        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1892        atomCount = tr_ptrArraySize( &t->peers );
1893        atoms = tr_new( struct peer_atom *, atomCount );
1894        for( i=0; i<atomCount; ++i )
1895            atoms[i] = peers[i]->atom;
1896    }
1897    else /* TR_PEERS_ALL */
1898    {
1899        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1900        atomCount = tr_ptrArraySize( &t->pool );
1901        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1902    }
1903
1904    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1905
1906    /**
1907    ***  add the first N of them into our return list
1908    **/
1909
1910    n = MIN( atomCount, maxCount );
1911    pex = walk = tr_new0( tr_pex, n );
1912
1913    for( i=0; i<atomCount && count<n; ++i )
1914    {
1915        const struct peer_atom * atom = atoms[i];
1916        if( atom->addr.type == af )
1917        {
1918            assert( tr_isAddress( &atom->addr ) );
1919            walk->addr = atom->addr;
1920            walk->port = atom->port;
1921            walk->flags = atom->flags;
1922            ++count;
1923            ++walk;
1924        }
1925    }
1926
1927    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1928
1929    assert( ( walk - pex ) == count );
1930    *setme_pex = pex;
1931
1932    /* cleanup */
1933    tr_free( atoms );
1934    managerUnlock( t->manager );
1935    return count;
1936}
1937
1938static void atomPulse      ( int, short, void * );
1939static void bandwidthPulse ( int, short, void * );
1940static void rechokePulse   ( int, short, void * );
1941static void reconnectPulse ( int, short, void * );
1942
1943static struct event *
1944createTimer( int msec, void (*callback)(int, short, void *), void * cbdata )
1945{
1946    struct event * timer = tr_new0( struct event, 1 );
1947    evtimer_set( timer, callback, cbdata );
1948    tr_timerAddMsec( timer, msec );
1949    return timer;
1950}
1951
1952static void
1953ensureMgrTimersExist( struct tr_peerMgr * m )
1954{
1955    if( m->atomTimer == NULL )
1956        m->atomTimer = createTimer( ATOM_PERIOD_MSEC, atomPulse, m );
1957
1958    if( m->bandwidthTimer == NULL )
1959        m->bandwidthTimer = createTimer( BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
1960
1961    if( m->rechokeTimer == NULL )
1962        m->rechokeTimer = createTimer( RECHOKE_PERIOD_MSEC, rechokePulse, m );
1963
1964    if( m->reconnectTimer == NULL )
1965        m->reconnectTimer = createTimer( RECONNECT_PERIOD_MSEC, reconnectPulse, m );
1966
1967    if( m->refillUpkeepTimer == NULL )
1968        m->refillUpkeepTimer = createTimer( REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
1969}
1970
1971void
1972tr_peerMgrStartTorrent( tr_torrent * tor )
1973{
1974    Torrent * t = tor->torrentPeers;
1975
1976    assert( t != NULL );
1977    managerLock( t->manager );
1978    ensureMgrTimersExist( t->manager );
1979
1980    t->isRunning = TRUE;
1981
1982    rechokePulse( 0, 0, t->manager );
1983    managerUnlock( t->manager );
1984}
1985
1986static void
1987stopTorrent( Torrent * t )
1988{
1989    int i, n;
1990
1991    assert( torrentIsLocked( t ) );
1992
1993    t->isRunning = FALSE;
1994
1995    /* disconnect the peers. */
1996    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
1997        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
1998    tr_ptrArrayClear( &t->peers );
1999
2000    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
2001     * which removes the handshake from t->outgoingHandshakes... */
2002    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2003        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2004}
2005
2006void
2007tr_peerMgrStopTorrent( tr_torrent * tor )
2008{
2009    Torrent * t = tor->torrentPeers;
2010
2011    managerLock( t->manager );
2012
2013    stopTorrent( t );
2014
2015    managerUnlock( t->manager );
2016}
2017
2018void
2019tr_peerMgrAddTorrent( tr_peerMgr * manager,
2020                      tr_torrent * tor )
2021{
2022    managerLock( manager );
2023
2024    assert( tor );
2025    assert( tor->torrentPeers == NULL );
2026
2027    tor->torrentPeers = torrentConstructor( manager, tor );
2028
2029    managerUnlock( manager );
2030}
2031
2032void
2033tr_peerMgrRemoveTorrent( tr_torrent * tor )
2034{
2035    tr_torrentLock( tor );
2036
2037    stopTorrent( tor->torrentPeers );
2038    torrentDestructor( tor->torrentPeers );
2039
2040    tr_torrentUnlock( tor );
2041}
2042
2043void
2044tr_peerMgrTorrentAvailability( const tr_torrent * tor,
2045                               int8_t           * tab,
2046                               unsigned int       tabCount )
2047{
2048    tr_piece_index_t   i;
2049    const Torrent *    t;
2050    float              interval;
2051    tr_bool            isSeed;
2052    int                peerCount;
2053    const tr_peer **   peers;
2054    tr_torrentLock( tor );
2055
2056    t = tor->torrentPeers;
2057    tor = t->tor;
2058    interval = tor->info.pieceCount / (float)tabCount;
2059    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
2060    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2061    peerCount = tr_ptrArraySize( &t->peers );
2062
2063    memset( tab, 0, tabCount );
2064
2065    for( i = 0; tor && i < tabCount; ++i )
2066    {
2067        const int piece = i * interval;
2068
2069        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2070            tab[i] = -1;
2071        else if( peerCount ) {
2072            int j;
2073            for( j = 0; j < peerCount; ++j )
2074                if( tr_bitsetHas( &peers[j]->have, i ) )
2075                    ++tab[i];
2076        }
2077    }
2078
2079    tr_torrentUnlock( tor );
2080}
2081
2082/* Returns the pieces that are available from peers */
2083tr_bitfield*
2084tr_peerMgrGetAvailable( const tr_torrent * tor )
2085{
2086    int i;
2087    int peerCount;
2088    Torrent * t = tor->torrentPeers;
2089    const tr_peer ** peers;
2090    tr_bitfield * pieces;
2091    managerLock( t->manager );
2092
2093    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2094    peerCount = tr_ptrArraySize( &t->peers );
2095    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2096    for( i=0; i<peerCount; ++i )
2097        tr_bitsetOr( pieces, &peers[i]->have );
2098
2099    managerUnlock( t->manager );
2100    return pieces;
2101}
2102
2103void
2104tr_peerMgrTorrentStats( tr_torrent       * tor,
2105                        int              * setmePeersKnown,
2106                        int              * setmePeersConnected,
2107                        int              * setmeSeedsConnected,
2108                        int              * setmeWebseedsSendingToUs,
2109                        int              * setmePeersSendingToUs,
2110                        int              * setmePeersGettingFromUs,
2111                        int              * setmePeersFrom )
2112{
2113    int i, size;
2114    const Torrent * t = tor->torrentPeers;
2115    const tr_peer ** peers;
2116    const tr_webseed ** webseeds;
2117
2118    managerLock( t->manager );
2119
2120    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2121    size = tr_ptrArraySize( &t->peers );
2122
2123    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2124    *setmePeersConnected       = 0;
2125    *setmeSeedsConnected       = 0;
2126    *setmePeersGettingFromUs   = 0;
2127    *setmePeersSendingToUs     = 0;
2128    *setmeWebseedsSendingToUs  = 0;
2129
2130    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2131        setmePeersFrom[i] = 0;
2132
2133    for( i=0; i<size; ++i )
2134    {
2135        const tr_peer * peer = peers[i];
2136        const struct peer_atom * atom = peer->atom;
2137
2138        if( peer->io == NULL ) /* not connected */
2139            continue;
2140
2141        ++*setmePeersConnected;
2142
2143        ++setmePeersFrom[atom->from];
2144
2145        if( clientIsDownloadingFrom( tor, peer ) )
2146            ++*setmePeersSendingToUs;
2147
2148        if( clientIsUploadingTo( peer ) )
2149            ++*setmePeersGettingFromUs;
2150
2151        if( atom->flags & ADDED_F_SEED_FLAG )
2152            ++*setmeSeedsConnected;
2153    }
2154
2155    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2156    size = tr_ptrArraySize( &t->webseeds );
2157    for( i=0; i<size; ++i )
2158        if( tr_webseedIsActive( webseeds[i] ) )
2159            ++*setmeWebseedsSendingToUs;
2160
2161    managerUnlock( t->manager );
2162}
2163
2164float
2165tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
2166{
2167    int i;
2168    float tmp;
2169    float ret = 0;
2170
2171    const Torrent * t = tor->torrentPeers;
2172    const int n = tr_ptrArraySize( &t->webseeds );
2173    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2174
2175    for( i=0; i<n; ++i )
2176        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
2177            ret += tmp;
2178
2179    return ret;
2180}
2181
2182
2183float*
2184tr_peerMgrWebSpeeds( const tr_torrent * tor )
2185{
2186    const Torrent * t = tor->torrentPeers;
2187    const tr_webseed ** webseeds;
2188    int i;
2189    int webseedCount;
2190    float * ret;
2191    uint64_t now;
2192
2193    assert( t->manager );
2194    managerLock( t->manager );
2195
2196    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2197    webseedCount = tr_ptrArraySize( &t->webseeds );
2198    assert( webseedCount == tor->info.webseedCount );
2199    ret = tr_new0( float, webseedCount );
2200    now = tr_date( );
2201
2202    for( i=0; i<webseedCount; ++i )
2203        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
2204            ret[i] = -1.0;
2205
2206    managerUnlock( t->manager );
2207    return ret;
2208}
2209
2210double
2211tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
2212{
2213    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
2214}
2215
2216
2217struct tr_peer_stat *
2218tr_peerMgrPeerStats( const tr_torrent    * tor,
2219                     int                 * setmeCount )
2220{
2221    int i, size;
2222    const Torrent * t = tor->torrentPeers;
2223    const tr_peer ** peers;
2224    tr_peer_stat * ret;
2225    uint64_t now;
2226
2227    assert( t->manager );
2228    managerLock( t->manager );
2229
2230    size = tr_ptrArraySize( &t->peers );
2231    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2232    ret = tr_new0( tr_peer_stat, size );
2233    now = tr_date( );
2234
2235    for( i=0; i<size; ++i )
2236    {
2237        char *                   pch;
2238        const tr_peer *          peer = peers[i];
2239        const struct peer_atom * atom = peer->atom;
2240        tr_peer_stat *           stat = ret + i;
2241
2242        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2243        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2244                   sizeof( stat->client ) );
2245        stat->port                = ntohs( peer->atom->port );
2246        stat->from                = atom->from;
2247        stat->progress            = peer->progress;
2248        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2249        stat->rateToPeer          = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
2250        stat->rateToClient        = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
2251        stat->peerIsChoked        = peer->peerIsChoked;
2252        stat->peerIsInterested    = peer->peerIsInterested;
2253        stat->clientIsChoked      = peer->clientIsChoked;
2254        stat->clientIsInterested  = peer->clientIsInterested;
2255        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2256        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2257        stat->isUploadingTo       = clientIsUploadingTo( peer );
2258        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2259        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2260        stat->pendingReqsToClient = peer->pendingReqsToClient;
2261
2262        pch = stat->flagStr;
2263        if( t->optimistic == peer ) *pch++ = 'O';
2264        if( stat->isDownloadingFrom ) *pch++ = 'D';
2265        else if( stat->clientIsInterested ) *pch++ = 'd';
2266        if( stat->isUploadingTo ) *pch++ = 'U';
2267        else if( stat->peerIsInterested ) *pch++ = 'u';
2268        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2269        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2270        if( stat->isEncrypted ) *pch++ = 'E';
2271        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2272        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2273        if( stat->isIncoming ) *pch++ = 'I';
2274        *pch = '\0';
2275    }
2276
2277    *setmeCount = size;
2278
2279    managerUnlock( t->manager );
2280    return ret;
2281}
2282
2283/**
2284***
2285**/
2286
2287struct ChokeData
2288{
2289    tr_bool         doUnchoke;
2290    tr_bool         isInterested;
2291    tr_bool         isChoked;
2292    int             rate;
2293    tr_peer *       peer;
2294};
2295
2296static int
2297compareChoke( const void * va,
2298              const void * vb )
2299{
2300    const struct ChokeData * a = va;
2301    const struct ChokeData * b = vb;
2302
2303    if( a->rate != b->rate ) /* prefer higher overall speeds */
2304        return a->rate > b->rate ? -1 : 1;
2305
2306    if( a->isChoked != b->isChoked ) /* prefer unchoked */
2307        return a->isChoked ? 1 : -1;
2308
2309    return 0;
2310}
2311
2312static int
2313isNew( const tr_peer * peer )
2314{
2315    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2316}
2317
2318static int
2319isSame( const tr_peer * peer )
2320{
2321    return peer && peer->client && strstr( peer->client, "Transmission" );
2322}
2323
2324/**
2325***
2326**/
2327
2328static void
2329rechokeTorrent( Torrent * t, const uint64_t now )
2330{
2331    int i, size, unchokedInterested;
2332    const int peerCount = tr_ptrArraySize( &t->peers );
2333    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2334    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2335    const tr_session * session = t->manager->session;
2336    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2337
2338    assert( torrentIsLocked( t ) );
2339
2340    /* sort the peers by preference and rate */
2341    for( i = 0, size = 0; i < peerCount; ++i )
2342    {
2343        tr_peer * peer = peers[i];
2344        struct peer_atom * atom = peer->atom;
2345
2346        if( peer->progress >= 1.0 ) /* choke all seeds */
2347        {
2348            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2349        }
2350        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2351        {
2352            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2353        }
2354        else if( chokeAll ) /* choke everyone if we're not uploading */
2355        {
2356            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2357        }
2358        else
2359        {
2360            struct ChokeData * n = &choke[size++];
2361            n->peer         = peer;
2362            n->isInterested = peer->peerIsInterested;
2363            n->isChoked     = peer->peerIsChoked;
2364            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2365        }
2366    }
2367
2368    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2369
2370    /**
2371     * Reciprocation and number of uploads capping is managed by unchoking
2372     * the N peers which have the best upload rate and are interested.
2373     * This maximizes the client's download rate. These N peers are
2374     * referred to as downloaders, because they are interested in downloading
2375     * from the client.
2376     *
2377     * Peers which have a better upload rate (as compared to the downloaders)
2378     * but aren't interested get unchoked. If they become interested, the
2379     * downloader with the worst upload rate gets choked. If a client has
2380     * a complete file, it uses its upload rate rather than its download
2381     * rate to decide which peers to unchoke.
2382     */
2383    unchokedInterested = 0;
2384    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2385        choke[i].doUnchoke = 1;
2386        if( choke[i].isInterested )
2387            ++unchokedInterested;
2388    }
2389
2390    /* optimistic unchoke */
2391    if( i < size )
2392    {
2393        int n;
2394        struct ChokeData * c;
2395        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2396
2397        for( ; i<size; ++i )
2398        {
2399            if( choke[i].isInterested )
2400            {
2401                const tr_peer * peer = choke[i].peer;
2402                int x = 1, y;
2403                if( isNew( peer ) ) x *= 3;
2404                if( isSame( peer ) ) x *= 3;
2405                for( y=0; y<x; ++y )
2406                    tr_ptrArrayAppend( &randPool, &choke[i] );
2407            }
2408        }
2409
2410        if(( n = tr_ptrArraySize( &randPool )))
2411        {
2412            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2413            c->doUnchoke = 1;
2414            t->optimistic = c->peer;
2415        }
2416
2417        tr_ptrArrayDestruct( &randPool, NULL );
2418    }
2419
2420    for( i=0; i<size; ++i )
2421        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2422
2423    /* cleanup */
2424    tr_free( choke );
2425}
2426
2427static void
2428rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
2429{
2430    uint64_t now;
2431    tr_torrent * tor = NULL;
2432    tr_peerMgr * mgr = vmgr;
2433    managerLock( mgr );
2434
2435    now = tr_date( );
2436    while(( tor = tr_torrentNext( mgr->session, tor )))
2437        if( tor->isRunning )
2438            rechokeTorrent( tor->torrentPeers, now );
2439
2440    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
2441    managerUnlock( mgr );
2442}
2443
2444/***
2445****
2446****  Life and Death
2447****
2448***/
2449
2450typedef enum
2451{
2452    TR_CAN_KEEP,
2453    TR_CAN_CLOSE,
2454    TR_MUST_CLOSE,
2455}
2456tr_close_type_t;
2457
2458static tr_close_type_t
2459shouldPeerBeClosed( const Torrent    * t,
2460                    const tr_peer    * peer,
2461                    int                peerCount,
2462                    const time_t       now )
2463{
2464    const tr_torrent *       tor = t->tor;
2465    const struct peer_atom * atom = peer->atom;
2466
2467    /* if it's marked for purging, close it */
2468    if( peer->doPurge )
2469    {
2470        tordbg( t, "purging peer %s because its doPurge flag is set",
2471                tr_atomAddrStr( atom ) );
2472        return TR_MUST_CLOSE;
2473    }
2474
2475    /* if we're seeding and the peer has everything we have,
2476     * and enough time has passed for a pex exchange, then disconnect */
2477    if( tr_torrentIsSeed( tor ) )
2478    {
2479        int peerHasEverything;
2480        if( atom->flags & ADDED_F_SEED_FLAG )
2481            peerHasEverything = TRUE;
2482        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2483            peerHasEverything = FALSE;
2484        else {
2485            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2486            tr_bitsetDifference( tmp, &peer->have );
2487            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2488            tr_bitfieldFree( tmp );
2489        }
2490
2491        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2492        {
2493            tordbg( t, "purging peer %s because we're both seeds",
2494                    tr_atomAddrStr( atom ) );
2495            return TR_MUST_CLOSE;
2496        }
2497    }
2498
2499    /* disconnect if it's been too long since piece data has been transferred.
2500     * this is on a sliding scale based on number of available peers... */
2501    {
2502        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2503        /* if we have >= relaxIfFewerThan, strictness is 100%.
2504         * if we have zero connections, strictness is 0% */
2505        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2506                               ? 1.0
2507                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2508        const int lo = MIN_UPLOAD_IDLE_SECS;
2509        const int hi = MAX_UPLOAD_IDLE_SECS;
2510        const int limit = hi - ( ( hi - lo ) * strictness );
2511        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2512/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2513        if( idleTime > limit ) {
2514            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2515                       tr_atomAddrStr( atom ), idleTime );
2516            return TR_CAN_CLOSE;
2517        }
2518    }
2519
2520    return TR_CAN_KEEP;
2521}
2522
2523static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2524
2525static tr_peer **
2526getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2527{
2528    int i, peerCount, outsize;
2529    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2530    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2531
2532    assert( torrentIsLocked( t ) );
2533
2534    for( i = outsize = 0; i < peerCount; ++i )
2535        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2536            ret[outsize++] = peers[i];
2537
2538    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2539
2540    *setmeSize = outsize;
2541    return ret;
2542}
2543
2544static int
2545compareCandidates( const void * va, const void * vb )
2546{
2547    const struct peer_atom * a = *(const struct peer_atom**) va;
2548    const struct peer_atom * b = *(const struct peer_atom**) vb;
2549
2550    /* <Charles> Here we would probably want to try reconnecting to
2551     * peers that had most recently given us data. Lots of users have
2552     * trouble with resets due to their routers and/or ISPs. This way we
2553     * can quickly recover from an unwanted reset. So we sort
2554     * piece_data_time in descending order.
2555     */
2556
2557    if( a->piece_data_time != b->piece_data_time )
2558        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2559
2560    if( a->numFails != b->numFails )
2561        return a->numFails < b->numFails ? -1 : 1;
2562
2563    if( a->time != b->time )
2564        return a->time < b->time ? -1 : 1;
2565
2566    /* In order to avoid fragmenting the swarm, peers from trackers and
2567     * from the DHT should be preferred to peers from PEX. */
2568    if( a->from != b->from )
2569        return a->from < b->from ? -1 : 1;
2570
2571    return 0;
2572}
2573
2574static int
2575getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2576{
2577    int sec;
2578
2579    /* if we were recently connected to this peer and transferring piece
2580     * data, try to reconnect to them sooner rather that later -- we don't
2581     * want network troubles to get in the way of a good peer. */
2582    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2583        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2584
2585    /* don't allow reconnects more often than our minimum */
2586    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2587        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2588
2589    /* otherwise, the interval depends on how many times we've tried
2590     * and failed to connect to the peer */
2591    else switch( atom->numFails ) {
2592        case 0: sec = 0; break;
2593        case 1: sec = 5; break;
2594        case 2: sec = 2 * 60; break;
2595        case 3: sec = 15 * 60; break;
2596        case 4: sec = 30 * 60; break;
2597        case 5: sec = 60 * 60; break;
2598        default: sec = 120 * 60; break;
2599    }
2600
2601    return sec;
2602}
2603
2604static struct peer_atom **
2605getPeerCandidates( Torrent * t, const time_t now, int * setmeSize )
2606{
2607    int                 i, atomCount, retCount;
2608    struct peer_atom ** atoms;
2609    struct peer_atom ** ret;
2610    const int           seed = tr_torrentIsSeed( t->tor );
2611
2612    assert( torrentIsLocked( t ) );
2613
2614    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2615    ret = tr_new( struct peer_atom*, atomCount );
2616    for( i = retCount = 0; i < atomCount; ++i )
2617    {
2618        int                interval;
2619        struct peer_atom * atom = atoms[i];
2620
2621        /* peer fed us too much bad data ... we only keep it around
2622         * now to weed it out in case someone sends it to us via pex */
2623        if( atom->myflags & MYFLAG_BANNED )
2624            continue;
2625
2626        /* peer was unconnectable before, so we're not going to keep trying.
2627         * this is needs a separate flag from `banned', since if they try
2628         * to connect to us later, we'll let them in */
2629        if( atom->myflags & MYFLAG_UNREACHABLE )
2630            continue;
2631
2632        /* no need to connect if we're both seeds... */
2633        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2634                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2635            continue;
2636
2637        /* don't reconnect too often */
2638        interval = getReconnectIntervalSecs( atom, now );
2639        if( ( now - atom->time ) < interval )
2640        {
2641            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2642                    i, tr_atomAddrStr( atom ), interval );
2643            continue;
2644        }
2645
2646        /* Don't connect to peers in our blocklist */
2647        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2648            continue;
2649
2650        /* we don't need two connections to the same peer... */
2651        if( peerIsInUse( t, atom ) )
2652            continue;
2653
2654        ret[retCount++] = atom;
2655    }
2656
2657    if( retCount != 0 )
2658        qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2659    *setmeSize = retCount;
2660    return ret;
2661}
2662
2663static void
2664closePeer( Torrent * t, tr_peer * peer )
2665{
2666    struct peer_atom * atom;
2667
2668    assert( t != NULL );
2669    assert( peer != NULL );
2670
2671    atom = peer->atom;
2672
2673    /* if we transferred piece data, then they might be good peers,
2674       so reset their `numFails' weight to zero.  otherwise we connected
2675       to them fruitlessly, so mark it as another fail */
2676    if( atom->piece_data_time )
2677        atom->numFails = 0;
2678    else
2679        ++atom->numFails;
2680
2681    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2682    removePeer( t, peer );
2683}
2684
2685/* Make a lot of slots available to newly-running torrents...
2686 * once they reach steady state, they shouldn't need as many */
2687static int
2688maxNewPeersPerPulse( const Torrent * t )
2689{
2690    double runTime;
2691    const tr_torrent * tor = t->tor;
2692
2693    assert( tr_isTorrent( tor ) );
2694
2695    runTime = difftime( tr_time( ), tor->startDate );
2696    if( runTime > 480 ) return 1;
2697    if( runTime > 240 ) return 2;
2698    if( runTime > 120 ) return 3;
2699    return 4;
2700}
2701
2702static void
2703reconnectTorrent( Torrent * t )
2704{
2705    static time_t prevTime = 0;
2706    static int    newConnectionsThisSecond = 0;
2707    const time_t  now = tr_time( );
2708
2709    if( prevTime != now )
2710    {
2711        prevTime = now;
2712        newConnectionsThisSecond = 0;
2713    }
2714
2715    if( !t->isRunning )
2716    {
2717        removeAllPeers( t );
2718    }
2719    else
2720    {
2721        int i;
2722        int mustCloseCount;
2723        int maxCandidates;
2724        struct tr_peer ** mustClose;
2725        const int maxPerPulse = maxNewPeersPerPulse( t );
2726
2727        /* disconnect the really bad peers */
2728        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2729        for( i=0; i<mustCloseCount; ++i )
2730            closePeer( t, mustClose[i] );
2731        tr_free( mustClose );
2732
2733        /* decide how many peers can we try to add in this pass */
2734   
2735        maxCandidates = maxPerPulse;
2736        if( tr_announcerHasBacklog( t->manager->session->announcer ) )
2737            maxCandidates /= 2;
2738        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2739        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2740
2741        /* select the best candidates, if they are requested */
2742        if( maxCandidates == 0 )
2743        {
2744            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2745                       "NO connection candidates needed, %d atoms, "
2746                       "max per pulse is %d",
2747                       t->tor->info.name, mustCloseCount,
2748                       tr_ptrArraySize( &t->pool ),
2749                       maxPerPulse );
2750
2751            tordbg( t, "maxCandidates is %d, maxPerPulse is %d, "
2752                       "getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2753                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2754                       maxCandidates, maxPerPulse,
2755                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2756                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2757        }
2758        else
2759        {
2760            int canCloseCount = 0;
2761            int candidateCount;
2762            struct peer_atom ** candidates;
2763
2764            candidates = getPeerCandidates( t, now, &candidateCount );
2765            maxCandidates = MIN( maxCandidates, candidateCount );
2766
2767            /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2768            if( maxCandidates != 0 )
2769            {
2770                struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, now, &canCloseCount );
2771                for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2772                   closePeer( t, canClose[i] );
2773                tr_free( canClose );
2774            }
2775
2776            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2777                       "%d can-close connections, %d connection candidates, "
2778                       "%d atoms, max per pulse is %d",
2779                       t->tor->info.name, mustCloseCount,
2780                       canCloseCount, candidateCount,
2781                       tr_ptrArraySize( &t->pool ), maxPerPulse );
2782
2783            tordbg( t, "candidateCount is %d, maxPerPulse is %d,"
2784                       " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2785                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2786                       candidateCount, maxPerPulse,
2787                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2788                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2789
2790            /* add some new ones */
2791            for( i=0; i<maxCandidates; ++i )
2792            {
2793                tr_peerMgr        * mgr = t->manager;
2794                struct peer_atom  * atom = candidates[i];
2795                tr_peerIo         * io;
2796
2797                tordbg( t, "Starting an OUTGOING connection with %s",
2798                        tr_atomAddrStr( atom ) );
2799
2800                io = tr_peerIoNewOutgoing( mgr->session,
2801                                           mgr->session->bandwidth,
2802                                           &atom->addr,
2803                                           atom->port,
2804                                           t->tor->info.hash,
2805                                           t->tor->completeness == TR_SEED );
2806
2807                if( io == NULL )
2808                {
2809                    tordbg( t, "peerIo not created; marking peer %s as unreachable",
2810                            tr_atomAddrStr( atom ) );
2811                    atom->myflags |= MYFLAG_UNREACHABLE;
2812                }
2813                else
2814                {
2815                    tr_handshake * handshake = tr_handshakeNew( io,
2816                                                                mgr->session->encryptionMode,
2817                                                                myHandshakeDoneCB,
2818                                                                mgr );
2819
2820                    assert( tr_peerIoGetTorrentHash( io ) );
2821
2822                    tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2823
2824                    ++newConnectionsThisSecond;
2825
2826                    tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2827                                             handshakeCompare );
2828                }
2829
2830                atom->time = now;
2831            }
2832            tr_free( candidates );
2833        }
2834    }
2835}
2836
2837struct peer_liveliness
2838{
2839    tr_peer * peer;
2840    void * clientData;
2841    time_t pieceDataTime;
2842    time_t time;
2843    int speed;
2844    tr_bool doPurge;
2845};
2846
2847static int
2848comparePeerLiveliness( const void * va, const void * vb )
2849{
2850    const struct peer_liveliness * a = va;
2851    const struct peer_liveliness * b = vb;
2852
2853    if( a->doPurge != b->doPurge )
2854        return a->doPurge ? 1 : -1;
2855
2856    if( a->speed != b->speed ) /* faster goes first */
2857        return a->speed > b->speed ? -1 : 1;
2858
2859    /* the one to give us data more recently goes first */
2860    if( a->pieceDataTime != b->pieceDataTime )
2861        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2862
2863    /* the one we connected to most recently goes first */
2864    if( a->time != b->time )
2865        return a->time > b->time ? -1 : 1;
2866
2867    return 0;
2868}
2869
2870static int
2871comparePeerLivelinessReverse( const void * va, const void * vb )
2872{
2873    return -comparePeerLiveliness (va, vb);
2874}
2875
2876static void
2877sortPeersByLivelinessImpl( tr_peer  ** peers,
2878                           void     ** clientData,
2879                           int         n,
2880                           uint64_t    now,
2881                           int (*compare) ( const void *va, const void *vb ) )
2882{
2883    int i;
2884    struct peer_liveliness *lives, *l;
2885
2886    /* build a sortable array of peer + extra info */
2887    lives = l = tr_new0( struct peer_liveliness, n );
2888    for( i=0; i<n; ++i, ++l )
2889    {
2890        tr_peer * p = peers[i];
2891        l->peer = p;
2892        l->doPurge = p->doPurge;
2893        l->pieceDataTime = p->atom->piece_data_time;
2894        l->time = p->atom->time;
2895        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2896                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2897        if( clientData )
2898            l->clientData = clientData[i];
2899    }
2900
2901    /* sort 'em */
2902    assert( n == ( l - lives ) );
2903    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2904
2905    /* build the peer array */
2906    for( i=0, l=lives; i<n; ++i, ++l ) {
2907        peers[i] = l->peer;
2908        if( clientData )
2909            clientData[i] = l->clientData;
2910    }
2911    assert( n == ( l - lives ) );
2912
2913    /* cleanup */
2914    tr_free( lives );
2915}
2916
2917static void
2918sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2919{
2920    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2921}
2922
2923static void
2924sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2925{
2926    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2927}
2928
2929
2930static void
2931enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2932{
2933    int n = tr_ptrArraySize( &t->peers );
2934    const int max = tr_torrentGetPeerLimit( t->tor );
2935    if( n > max )
2936    {
2937        void * base = tr_ptrArrayBase( &t->peers );
2938        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2939        sortPeersByLiveliness( peers, NULL, n, now );
2940        while( n > max )
2941            closePeer( t, peers[--n] );
2942        tr_free( peers );
2943    }
2944}
2945
2946static void
2947enforceSessionPeerLimit( tr_session * session, uint64_t now )
2948{
2949    int n = 0;
2950    tr_torrent * tor = NULL;
2951    const int max = tr_sessionGetPeerLimit( session );
2952
2953    /* count the total number of peers */
2954    while(( tor = tr_torrentNext( session, tor )))
2955        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2956
2957    /* if there are too many, prune out the worst */
2958    if( n > max )
2959    {
2960        tr_peer ** peers = tr_new( tr_peer*, n );
2961        Torrent ** torrents = tr_new( Torrent*, n );
2962
2963        /* populate the peer array */
2964        n = 0;
2965        tor = NULL;
2966        while(( tor = tr_torrentNext( session, tor ))) {
2967            int i;
2968            Torrent * t = tor->torrentPeers;
2969            const int tn = tr_ptrArraySize( &t->peers );
2970            for( i=0; i<tn; ++i, ++n ) {
2971                peers[n] = tr_ptrArrayNth( &t->peers, i );
2972                torrents[n] = t;
2973            }
2974        }
2975
2976        /* sort 'em */
2977        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2978
2979        /* cull out the crappiest */
2980        while( n-- > max )
2981            closePeer( torrents[n], peers[n] );
2982
2983        /* cleanup */
2984        tr_free( torrents );
2985        tr_free( peers );
2986    }
2987}
2988
2989struct reconnectTorrentStruct
2990{
2991    tr_torrent * torrent;
2992    int salt;
2993};
2994
2995static int
2996compareReconnectTorrents( const void * va, const void * vb )
2997{
2998    int ai, bi;
2999    const struct reconnectTorrentStruct * a = va;
3000    const struct reconnectTorrentStruct * b = vb;
3001
3002    /* primary key: higher priority goes first */
3003    ai = tr_torrentGetPriority( a->torrent );
3004    bi = tr_torrentGetPriority( b->torrent );
3005    if( ai != bi )
3006        return ai > bi ? -1 : 1;
3007
3008    /* secondary key: since users tend to stare at the screens
3009     * watching their downloads' progress, give downloads a
3010     * first shot at attempting outbound peer connections. */
3011    ai = tr_torrentIsSeed( a->torrent );
3012    bi = tr_torrentIsSeed( b->torrent );
3013    if( ai != bi )
3014        return bi ? -1 : 1;
3015
3016    /* tertiary key: random */
3017    if( a->salt != b->salt )
3018        return a->salt - b->salt;
3019
3020    return 0;
3021}
3022
3023static void
3024reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3025{
3026    tr_torrent * tor;
3027    tr_peerMgr * mgr = vmgr;
3028    struct reconnectTorrentStruct * torrents;
3029    int torrentCount;
3030    int i;
3031    uint64_t now;
3032    managerLock( mgr );
3033
3034    now = tr_date( );
3035
3036    /**
3037    ***  enforce the per-session and per-torrent peer limits
3038    **/
3039
3040    /* if we're over the per-torrent peer limits, cull some peers */
3041    tor = NULL;
3042    while(( tor = tr_torrentNext( mgr->session, tor )))
3043        if( tor->isRunning )
3044            enforceTorrentPeerLimit( tor->torrentPeers, now );
3045
3046    /* if we're over the per-session peer limits, cull some peers */
3047    enforceSessionPeerLimit( mgr->session, now );
3048
3049    /**
3050    ***  try to make new peer connections
3051    **/
3052
3053    torrentCount = 0;
3054    torrents = tr_new( struct reconnectTorrentStruct,
3055                       mgr->session->torrentCount );
3056    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3057        if( tor->isRunning ) {
3058            struct reconnectTorrentStruct * r = torrents + torrentCount++;
3059            r->torrent = tor;
3060            r->salt = tr_cryptoWeakRandInt( 1024 );
3061        }
3062    }
3063    qsort( torrents,
3064           torrentCount, sizeof( struct reconnectTorrentStruct ),
3065           compareReconnectTorrents );
3066    for( i=0; i<torrentCount; ++i )
3067        reconnectTorrent( torrents[i].torrent->torrentPeers );
3068
3069
3070    /* cleanup */
3071    tr_free( torrents );
3072    tr_timerAddMsec( mgr->reconnectTimer, RECONNECT_PERIOD_MSEC );
3073    managerUnlock( mgr );
3074}
3075
3076/****
3077*****
3078*****  BANDWIDTH ALLOCATION
3079*****
3080****/
3081
3082static void
3083pumpAllPeers( tr_peerMgr * mgr )
3084{
3085    tr_torrent * tor = NULL;
3086
3087    while(( tor = tr_torrentNext( mgr->session, tor )))
3088    {
3089        int j;
3090        Torrent * t = tor->torrentPeers;
3091
3092        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3093        {
3094            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3095            tr_peerMsgsPulse( peer->msgs );
3096        }
3097    }
3098}
3099
3100static void
3101bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3102{
3103    tr_torrent * tor = NULL;
3104    tr_peerMgr * mgr = vmgr;
3105    managerLock( mgr );
3106
3107    /* FIXME: this next line probably isn't necessary... */
3108    pumpAllPeers( mgr );
3109
3110    /* allocate bandwidth to the peers */
3111    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3112    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3113
3114    /* possibly stop torrents that have seeded enough */
3115    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3116        if( tor->needsSeedRatioCheck ) {
3117            tor->needsSeedRatioCheck = FALSE;
3118            tr_torrentCheckSeedRatio( tor );
3119        }
3120    }
3121
3122    /* run the completeness check for any torrents that need it */
3123    tor = NULL;
3124    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3125        if( tor->torrentPeers->needsCompletenessCheck ) {
3126            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3127            tr_torrentRecheckCompleteness( tor );
3128        }
3129    }
3130
3131    /* possibly stop torrents that have an error */
3132    tor = NULL;
3133    while(( tor = tr_torrentNext( mgr->session, tor )))
3134        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
3135            tr_torrentStop( tor );
3136
3137    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3138    managerUnlock( mgr );
3139}
3140
3141/***
3142****
3143***/
3144
3145static int
3146compareAtomPtrsByAddress( const void * va, const void *vb )
3147{
3148    const struct peer_atom * a = * (const struct peer_atom**) va;
3149    const struct peer_atom * b = * (const struct peer_atom**) vb;
3150
3151    assert( tr_isAtom( a ) );
3152    assert( tr_isAtom( b ) );
3153
3154    return tr_compareAddresses( &a->addr, &b->addr );
3155}
3156
3157/* best come first, worst go last */
3158static int
3159compareAtomPtrsByShelfDate( const void * va, const void *vb )
3160{
3161    time_t atime;
3162    time_t btime;
3163    const struct peer_atom * a = * (const struct peer_atom**) va;
3164    const struct peer_atom * b = * (const struct peer_atom**) vb;
3165    const int data_time_cutoff_secs = 60 * 60;
3166    const time_t tr_now = tr_time( );
3167
3168    assert( tr_isAtom( a ) );
3169    assert( tr_isAtom( b ) );
3170
3171    /* primary key: the last piece data time *if* it was within the last hour */
3172    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3173    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3174    if( atime != btime )
3175        return atime > btime ? -1 : 1;
3176
3177    /* secondary key: shelf date. */
3178    if( a->shelf_date != b->shelf_date )
3179        return a->shelf_date > b->shelf_date ? -1 : 1;
3180
3181    return 0;
3182}
3183
3184static int
3185getMaxAtomCount( const tr_torrent * tor )
3186{
3187    /* FIXME: this curve should be smoother... */
3188    const int n = tor->maxConnectedPeers;
3189    if( n >= 200 ) return n * 1.5;
3190    if( n >= 100 ) return n * 2;
3191    if( n >=  50 ) return n * 3;
3192    if( n >=  20 ) return n * 5;
3193    return n * 10;
3194}
3195
3196static void
3197atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3198{
3199    tr_torrent * tor = NULL;
3200    tr_peerMgr * mgr = vmgr;
3201    managerLock( mgr );
3202
3203    while(( tor = tr_torrentNext( mgr->session, tor )))
3204    {
3205        int atomCount;
3206        Torrent * t = tor->torrentPeers;
3207        const int maxAtomCount = getMaxAtomCount( tor );
3208        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3209
3210        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3211        {
3212            int i;
3213            int keepCount = 0;
3214            int testCount = 0;
3215            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3216            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3217
3218            /* keep the ones that are in use */
3219            for( i=0; i<atomCount; ++i ) {
3220                struct peer_atom * atom = atoms[i];
3221                if( peerIsInUse( t, atom ) )
3222                    keep[keepCount++] = atom;
3223                else
3224                    test[testCount++] = atom;
3225            }
3226
3227            /* if there's room, keep the best of what's left */
3228            i = 0;
3229            if( keepCount < maxAtomCount ) {
3230                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3231                while( i<testCount && keepCount<maxAtomCount )
3232                    keep[keepCount++] = test[i++];
3233            }
3234
3235            /* free the culled atoms */
3236            while( i<testCount )
3237                tr_free( test[i++] );
3238
3239            /* rebuild Torrent.pool with what's left */
3240            tr_ptrArrayDestruct( &t->pool, NULL );
3241            t->pool = TR_PTR_ARRAY_INIT;
3242            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3243            for( i=0; i<keepCount; ++i )
3244                tr_ptrArrayAppend( &t->pool, keep[i] );
3245
3246            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3247
3248            /* cleanup */
3249            tr_free( test );
3250            tr_free( keep );
3251        }
3252    }
3253
3254    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3255    managerUnlock( mgr );
3256}
Note: See TracBrowser for help on using the repository browser.