source: trunk/libtransmission/peer-mgr.c @ 10415

Last change on this file since 10415 was 10415, checked in by charles, 12 years ago

(trunk libT) increase request TTL to 120 seconds

  • Property svn:keywords set to Date Rev Author Id
File size: 100.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 10415 2010-03-22 02:27:57Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16
17#include <event.h>
18
19#include "transmission.h"
20#include "announcer.h"
21#include "bandwidth.h"
22#include "bencode.h"
23#include "blocklist.h"
24#include "clients.h"
25#include "completion.h"
26#include "crypto.h"
27#include "handshake.h"
28#include "inout.h" /* tr_ioTestPiece */
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "session.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "utils.h"
38#include "webseed.h"
39
40enum
41{
42    /* how frequently to cull old atoms */
43    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
44
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* how frequently to reallocate bandwidth */
49    BANDWIDTH_PERIOD_MSEC = 500,
50
51    /* how frequently to age out old piece request lists */
52    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
53
54    /* how frequently to decide which peers live and die */
55    RECONNECT_PERIOD_MSEC = 500,
56
57    /* when many peers are available, keep idle ones this long */
58    MIN_UPLOAD_IDLE_SECS = ( 60 ),
59
60    /* when few peers are available, keep idle ones this long */
61    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
62
63    /* max number of peers to ask for per second overall.
64    * this throttle is to avoid overloading the router */
65    MAX_CONNECTIONS_PER_SECOND = 12,
66
67    /* number of bad pieces a peer is allowed to send before we ban them */
68    MAX_BAD_PIECES_PER_PEER = 5,
69
70    /* amount of time to keep a list of request pieces lying around
71       before it's considered too old and needs to be rebuilt */
72    PIECE_LIST_SHELF_LIFE_SECS = 60,
73
74    /* use for bitwise operations w/peer_atom.myflags */
75    MYFLAG_BANNED = 1,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    /* unreachable for now... but not banned.
79     * if they try to connect to us it's okay */
80    MYFLAG_UNREACHABLE = 2,
81
82    /* the minimum we'll wait before attempting to reconnect to a peer */
83    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
84
85    /** how long we'll let requests we've made linger before we cancel them */
86    REQUEST_TTL_SECS = 120,
87
88    CANCEL_HISTORY_SEC = 120
89};
90
91
92/**
93***
94**/
95
96enum
97{
98    UPLOAD_ONLY_UKNOWN,
99    UPLOAD_ONLY_YES,
100    UPLOAD_ONLY_NO
101};
102
103/**
104 * Peer information that should be kept even before we've connected and
105 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
106 * which ones would make good candidates for connecting to, and to watch out
107 * for banned peers.
108 *
109 * @see tr_peer
110 * @see tr_peermsgs
111 */
112struct peer_atom
113{
114    tr_peer   * peer;        /* will be NULL if not connected */
115    uint8_t     from;
116    uint8_t     flags;       /* these match the added_f flags */
117    uint8_t     myflags;     /* flags that aren't defined in added_f */
118    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
119    tr_port     port;
120    uint16_t    numFails;
121    tr_address  addr;
122    time_t      time;        /* when the peer's connection status last changed */
123    time_t      piece_data_time;
124
125    /* similar to a TTL field, but less rigid --
126     * if the swarm is small, the atom will be kept past this date. */
127    time_t      shelf_date;
128};
129
130#ifdef NDEBUG
131#define tr_isAtom(a) (TRUE)
132#else
133static tr_bool
134tr_isAtom( const struct peer_atom * atom )
135{
136    return ( atom != NULL )
137        && ( atom->from < TR_PEER_FROM__MAX )
138        && ( tr_isAddress( &atom->addr ) );
139}
140#endif
141
142static const char*
143tr_atomAddrStr( const struct peer_atom * atom )
144{
145    return tr_peerIoAddrStr( &atom->addr, atom->port );
146}
147
148struct block_request
149{
150    tr_block_index_t block;
151    tr_peer * peer;
152    time_t sentAt;
153};
154
155struct weighted_piece
156{
157    tr_piece_index_t index;
158    int16_t salt;
159    int16_t requestCount;
160};
161
162/** @brief Opaque, per-torrent data structure for peer connection information */
163typedef struct tr_torrent_peers
164{
165    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
166    tr_ptrArray                pool; /* struct peer_atom */
167    tr_ptrArray                peers; /* tr_peer */
168    tr_ptrArray                webseeds; /* tr_webseed */
169
170    tr_torrent               * tor;
171    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
172    struct tr_peerMgr        * manager;
173
174    tr_bool                    isRunning;
175    tr_bool                    needsCompletenessCheck;
176
177    struct block_request     * requests;
178    int                        requestCount;
179    int                        requestAlloc;
180
181    struct weighted_piece    * pieces;
182    int                        pieceCount;
183
184    int                        interestedCount;
185
186    /* An arbitrary metric of how congested the downloads are.
187     * Based on how many of requests are cancelled and how many are completed.
188     * Lower values indicate less congestion. */
189    double                     downloadCongestion;
190}
191Torrent;
192
193struct tr_peerMgr
194{
195    tr_session    * session;
196    tr_ptrArray     incomingHandshakes; /* tr_handshake */
197    struct event  * bandwidthTimer;
198    struct event  * rechokeTimer;
199    struct event  * reconnectTimer;
200    struct event  * refillUpkeepTimer;
201    struct event  * atomTimer;
202};
203
204#define tordbg( t, ... ) \
205    do { \
206        if( tr_deepLoggingIsActive( ) ) \
207            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
208    } while( 0 )
209
210#define dbgmsg( ... ) \
211    do { \
212        if( tr_deepLoggingIsActive( ) ) \
213            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
214    } while( 0 )
215
216/**
217***
218**/
219
220static inline void
221managerLock( const struct tr_peerMgr * manager )
222{
223    tr_sessionLock( manager->session );
224}
225
226static inline void
227managerUnlock( const struct tr_peerMgr * manager )
228{
229    tr_sessionUnlock( manager->session );
230}
231
232static inline void
233torrentLock( Torrent * torrent )
234{
235    managerLock( torrent->manager );
236}
237
238static inline void
239torrentUnlock( Torrent * torrent )
240{
241    managerUnlock( torrent->manager );
242}
243
244static inline int
245torrentIsLocked( const Torrent * t )
246{
247    return tr_sessionIsLocked( t->manager->session );
248}
249
250/**
251***
252**/
253
254static int
255handshakeCompareToAddr( const void * va, const void * vb )
256{
257    const tr_handshake * a = va;
258
259    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
260}
261
262static int
263handshakeCompare( const void * a, const void * b )
264{
265    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
266}
267
268static tr_handshake*
269getExistingHandshake( tr_ptrArray      * handshakes,
270                      const tr_address * addr )
271{
272    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
273}
274
275static int
276comparePeerAtomToAddress( const void * va, const void * vb )
277{
278    const struct peer_atom * a = va;
279
280    return tr_compareAddresses( &a->addr, vb );
281}
282
283static int
284compareAtomsByAddress( const void * va, const void * vb )
285{
286    const struct peer_atom * b = vb;
287
288    assert( tr_isAtom( b ) );
289
290    return comparePeerAtomToAddress( va, &b->addr );
291}
292
293/**
294***
295**/
296
297const tr_address *
298tr_peerAddress( const tr_peer * peer )
299{
300    return &peer->atom->addr;
301}
302
303static Torrent*
304getExistingTorrent( tr_peerMgr *    manager,
305                    const uint8_t * hash )
306{
307    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
308
309    return tor == NULL ? NULL : tor->torrentPeers;
310}
311
312static int
313peerCompare( const void * a, const void * b )
314{
315    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
316}
317
318static struct peer_atom*
319getExistingAtom( const Torrent    * t,
320                 const tr_address * addr )
321{
322    Torrent * tt = (Torrent*)t;
323    assert( torrentIsLocked( t ) );
324    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
325}
326
327static tr_bool
328peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
329{
330    Torrent * t = (Torrent*) ct;
331
332    assert( torrentIsLocked ( t ) );
333
334    return ( atom->peer != NULL )
335        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
336        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
337}
338
339static tr_peer*
340peerConstructor( struct peer_atom * atom )
341{
342    tr_peer * peer = tr_new0( tr_peer, 1 );
343
344    tr_bitsetConstructor( &peer->have, 0 );
345
346    peer->atom = atom;
347    atom->peer = peer;
348
349    peer->blocksSentToClient  = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
350    peer->blocksSentToPeer    = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
351    peer->cancelsSentToClient = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
352    peer->cancelsSentToPeer   = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
353
354    return peer;
355}
356
357static tr_peer*
358getPeer( Torrent * torrent, struct peer_atom * atom )
359{
360    tr_peer * peer;
361
362    assert( torrentIsLocked( torrent ) );
363
364    peer = atom->peer;
365
366    if( peer == NULL )
367    {
368        peer = peerConstructor( atom );
369        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
370    }
371
372    return peer;
373}
374
375static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
376
377static void
378peerDestructor( Torrent * t, tr_peer * peer )
379{
380    assert( peer != NULL );
381
382    peerDeclinedAllRequests( t, peer );
383
384    if( peer->msgs != NULL )
385    {
386        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
387        tr_peerMsgsFree( peer->msgs );
388    }
389
390    tr_peerIoClear( peer->io );
391    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
392
393    tr_historyFree( peer->blocksSentToClient  );
394    tr_historyFree( peer->blocksSentToPeer    );
395    tr_historyFree( peer->cancelsSentToClient );
396    tr_historyFree( peer->cancelsSentToPeer   );
397
398    tr_bitsetDestructor( &peer->have );
399    tr_bitfieldFree( peer->blame );
400    tr_free( peer->client );
401    peer->atom->peer = NULL;
402
403    tr_free( peer );
404}
405
406static void
407removePeer( Torrent * t, tr_peer * peer )
408{
409    tr_peer * removed;
410    struct peer_atom * atom = peer->atom;
411
412    assert( torrentIsLocked( t ) );
413    assert( atom );
414
415    atom->time = tr_time( );
416
417    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
418    assert( removed == peer );
419    peerDestructor( t, removed );
420}
421
422static void
423removeAllPeers( Torrent * t )
424{
425    while( !tr_ptrArrayEmpty( &t->peers ) )
426        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
427}
428
429static void
430torrentDestructor( void * vt )
431{
432    Torrent * t = vt;
433
434    assert( t );
435    assert( !t->isRunning );
436    assert( torrentIsLocked( t ) );
437    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
438    assert( tr_ptrArrayEmpty( &t->peers ) );
439
440    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
441    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
442    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
443    tr_ptrArrayDestruct( &t->peers, NULL );
444
445    tr_free( t->requests );
446    tr_free( t->pieces );
447    tr_free( t );
448}
449
450static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
451
452static Torrent*
453torrentConstructor( tr_peerMgr * manager,
454                    tr_torrent * tor )
455{
456    int       i;
457    Torrent * t;
458
459    t = tr_new0( Torrent, 1 );
460    t->manager = manager;
461    t->tor = tor;
462    t->pool = TR_PTR_ARRAY_INIT;
463    t->peers = TR_PTR_ARRAY_INIT;
464    t->webseeds = TR_PTR_ARRAY_INIT;
465    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
466
467    for( i = 0; i < tor->info.webseedCount; ++i )
468    {
469        tr_webseed * w =
470            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
471        tr_ptrArrayAppend( &t->webseeds, w );
472    }
473
474    return t;
475}
476
477tr_peerMgr*
478tr_peerMgrNew( tr_session * session )
479{
480    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
481    m->session = session;
482    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
483    return m;
484}
485
486static void
487deleteTimer( struct event ** t )
488{
489    if( *t != NULL )
490    {
491        evtimer_del( *t );
492        tr_free( *t );
493        *t = NULL;
494    }
495}
496
497static void
498deleteTimers( struct tr_peerMgr * m )
499{
500    deleteTimer( &m->atomTimer );
501    deleteTimer( &m->bandwidthTimer );
502    deleteTimer( &m->rechokeTimer );
503    deleteTimer( &m->reconnectTimer );
504    deleteTimer( &m->refillUpkeepTimer );
505}
506
507void
508tr_peerMgrFree( tr_peerMgr * manager )
509{
510    managerLock( manager );
511
512    deleteTimers( manager );
513
514    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
515     * the item from manager->handshakes, so this is a little roundabout... */
516    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
517        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
518
519    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
520
521    managerUnlock( manager );
522    tr_free( manager );
523}
524
525static int
526clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
527{
528    if( !tr_torrentHasMetadata( tor ) )
529        return TRUE;
530
531    return peer->clientIsInterested && !peer->clientIsChoked;
532}
533
534static int
535clientIsUploadingTo( const tr_peer * peer )
536{
537    return peer->peerIsInterested && !peer->peerIsChoked;
538}
539
540/***
541****
542***/
543
544tr_bool
545tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
546                      const tr_address  * addr )
547{
548    tr_bool isSeed = FALSE;
549    const Torrent * t = tor->torrentPeers;
550    const struct peer_atom * atom = getExistingAtom( t, addr );
551
552    if( atom )
553        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
554
555    return isSeed;
556}
557
558/**
559***  REQUESTS
560***
561*** There are two data structures associated with managing block requests:
562***
563*** 1. Torrent::requests, an array of "struct block_request" which keeps
564***    track of which blocks have been requested, and when, and by which peers.
565***    This is list is used for (a) cancelling requests that have been pending
566***    for too long and (b) avoiding duplicate requests before endgame.
567***
568*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
569***    pieces that we want to request.  It's used to decide which blocks to
570***    return next when tr_peerMgrGetBlockRequests() is called.
571**/
572
573/**
574*** struct block_request
575**/
576
577static int
578compareReqByBlock( const void * va, const void * vb )
579{
580    const struct block_request * a = va;
581    const struct block_request * b = vb;
582
583    /* primary key: block */
584    if( a->block < b->block ) return -1;
585    if( a->block > b->block ) return 1;
586
587    /* secondary key: peer */
588    if( a->peer < b->peer ) return -1;
589    if( a->peer > b->peer ) return 1;
590
591    return 0;
592}
593
594static void
595requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
596{
597    struct block_request key;
598
599    /* ensure enough room is available... */
600    if( t->requestCount + 1 >= t->requestAlloc )
601    {
602        const int CHUNK_SIZE = 128;
603        t->requestAlloc += CHUNK_SIZE;
604        t->requests = tr_renew( struct block_request,
605                                t->requests, t->requestAlloc );
606    }
607
608    /* populate the record we're inserting */
609    key.block = block;
610    key.peer = peer;
611    key.sentAt = tr_time( );
612
613    /* insert the request to our array... */
614    {
615        tr_bool exact;
616        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
617                                       sizeof( struct block_request ),
618                                       compareReqByBlock, &exact );
619        assert( !exact );
620        memmove( t->requests + pos + 1,
621                 t->requests + pos,
622                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
623        t->requests[pos] = key;
624    }
625
626    if( peer != NULL )
627    {
628        ++peer->pendingReqsToPeer;
629        assert( peer->pendingReqsToPeer >= 0 );
630    }
631
632    /*fprintf( stderr, "added request of block %lu from peer %s... "
633                       "there are now %d block\n",
634                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
635}
636
637static struct block_request *
638requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
639{
640    struct block_request key;
641    key.block = block;
642    key.peer = (tr_peer*) peer;
643
644    return bsearch( &key, t->requests, t->requestCount,
645                    sizeof( struct block_request ),
646                    compareReqByBlock );
647}
648
649/* how many peers are we currently requesting this block from... */
650static int
651countBlockRequests( Torrent * t, tr_block_index_t block )
652{
653    tr_bool exact;
654    int i, n, pos;
655    struct block_request key;
656
657    key.block = block;
658    key.peer = NULL;
659    pos = tr_lowerBound( &key, t->requests, t->requestCount,
660                         sizeof( struct block_request ),
661                         compareReqByBlock, &exact );
662
663    assert( !exact ); /* shouldn't have a request with .peer == NULL */
664
665    n = 0;
666    for( i=pos; i<t->requestCount; ++i ) {
667        if( t->requests[i].block == block )
668            ++n;
669        else
670            break;
671    }
672
673    return n;
674}
675
676static void
677decrementPendingReqCount( const struct block_request * b )
678{
679    if( b->peer != NULL )
680        if( b->peer->pendingReqsToPeer > 0 )
681            --b->peer->pendingReqsToPeer;
682}
683
684static void
685requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
686{
687    const struct block_request * b = requestListLookup( t, block, peer );
688    if( b != NULL )
689    {
690        const int pos = b - t->requests;
691        assert( pos < t->requestCount );
692
693        decrementPendingReqCount( b );
694
695        tr_removeElementFromArray( t->requests,
696                                   pos,
697                                   sizeof( struct block_request ),
698                                   t->requestCount-- );
699
700        /*fprintf( stderr, "removing request of block %lu from peer %s... "
701                           "there are now %d block requests left\n",
702                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
703    }
704}
705
706/**
707*** struct weighted_piece
708**/
709
710enum
711{
712    PIECES_UNSORTED,
713    PIECES_SORTED_BY_INDEX,
714    PIECES_SORTED_BY_WEIGHT
715};
716
717const tr_torrent * weightTorrent;
718
719/* we try to create a "weight" s.t. high-priority pieces come before others,
720 * and that partially-complete pieces come before empty ones. */
721static int
722comparePieceByWeight( const void * va, const void * vb )
723{
724    const struct weighted_piece * a = va;
725    const struct weighted_piece * b = vb;
726    int ia, ib, missing, pending;
727    const tr_torrent * tor = weightTorrent;
728
729    /* primary key: weight */
730    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
731    pending = a->requestCount;
732    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
733    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
734    pending = b->requestCount;
735    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
736    if( ia < ib ) return -1;
737    if( ia > ib ) return 1;
738
739    /* secondary key: higher priorities go first */
740    ia = tor->info.pieces[a->index].priority;
741    ib = tor->info.pieces[b->index].priority;
742    if( ia > ib ) return -1;
743    if( ia < ib ) return 1;
744
745    /* tertiary key: random */
746    if( a->salt < b->salt ) return -1;
747    if( a->salt > b->salt ) return 1;
748
749    /* okay, they're equal */
750    return 0;
751}
752
753static int
754comparePieceByIndex( const void * va, const void * vb )
755{
756    const struct weighted_piece * a = va;
757    const struct weighted_piece * b = vb;
758    if( a->index < b->index ) return -1;
759    if( a->index > b->index ) return 1;
760    return 0;
761}
762
763static void
764pieceListSort( Torrent * t, int mode )
765{
766    int(*compar)(const void *, const void *);
767
768    assert( mode==PIECES_SORTED_BY_INDEX
769         || mode==PIECES_SORTED_BY_WEIGHT );
770
771    switch( mode ) {
772        case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
773        case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
774        default: assert( 0 && "unhandled" );  break;
775    }
776
777    weightTorrent = t->tor;
778    qsort( t->pieces, t->pieceCount,
779           sizeof( struct weighted_piece ), compar );
780}
781
782static tr_bool
783isInEndgame( Torrent * t )
784{
785    tr_bool endgame = FALSE;
786
787    if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
788    {
789        const struct weighted_piece * p = t->pieces;
790        const int pending = p->requestCount;
791        const int missing = tr_cpMissingBlocksInPiece( &t->tor->completion, p->index );
792        endgame = pending >= missing;
793    }
794
795    /*if( endgame ) fprintf( stderr, "ENDGAME reached\n" );*/
796    return endgame;
797}
798
799/**
800 * This function is useful for sanity checking,
801 * but is too expensive even for nightly builds...
802 * let's leave it disabled but add an easy hook to compile it back in
803 */
804#if 0
805static void
806assertWeightedPiecesAreSorted( Torrent * t )
807{
808    if( !isInEndgame( t ) )
809    {
810        int i;
811        weightTorrent = t->tor;
812        for( i=0; i<t->pieceCount-1; ++i )
813            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
814    }
815}
816#else
817#define assertWeightedPiecesAreSorted(t)
818#endif
819
820static struct weighted_piece *
821pieceListLookup( Torrent * t, tr_piece_index_t index )
822{
823    int i;
824
825    for( i=0; i<t->pieceCount; ++i )
826        if( t->pieces[i].index == index )
827            return &t->pieces[i];
828
829    return NULL;
830}
831
832static void
833pieceListRebuild( Torrent * t )
834{
835    assertWeightedPiecesAreSorted( t );
836
837    if( !tr_torrentIsSeed( t->tor ) )
838    {
839        tr_piece_index_t i;
840        tr_piece_index_t * pool;
841        tr_piece_index_t poolCount = 0;
842        const tr_torrent * tor = t->tor;
843        const tr_info * inf = tr_torrentInfo( tor );
844        struct weighted_piece * pieces;
845        int pieceCount;
846
847        /* build the new list */
848        pool = tr_new( tr_piece_index_t, inf->pieceCount );
849        for( i=0; i<inf->pieceCount; ++i )
850            if( !inf->pieces[i].dnd )
851                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
852                    pool[poolCount++] = i;
853        pieceCount = poolCount;
854        pieces = tr_new0( struct weighted_piece, pieceCount );
855        for( i=0; i<poolCount; ++i ) {
856            struct weighted_piece * piece = pieces + i;
857            piece->index = pool[i];
858            piece->requestCount = 0;
859            piece->salt = tr_cryptoWeakRandInt( 4096 );
860        }
861
862        /* if we already had a list of pieces, merge it into
863         * the new list so we don't lose its requestCounts */
864        if( t->pieces != NULL )
865        {
866            struct weighted_piece * o = t->pieces;
867            struct weighted_piece * oend = o + t->pieceCount;
868            struct weighted_piece * n = pieces;
869            struct weighted_piece * nend = n + pieceCount;
870
871            pieceListSort( t, PIECES_SORTED_BY_INDEX );
872
873            while( o!=oend && n!=nend ) {
874                if( o->index < n->index )
875                    ++o;
876                else if( o->index > n->index )
877                    ++n;
878                else
879                    *n++ = *o++;
880            }
881
882            tr_free( t->pieces );
883        }
884
885        t->pieces = pieces;
886        t->pieceCount = pieceCount;
887
888        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
889
890        /* cleanup */
891        tr_free( pool );
892    }
893}
894
895static void
896pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
897{
898    struct weighted_piece * p;
899
900    assertWeightedPiecesAreSorted( t );
901
902    if(( p = pieceListLookup( t, piece )))
903    {
904        const int pos = p - t->pieces;
905
906        tr_removeElementFromArray( t->pieces,
907                                   pos,
908                                   sizeof( struct weighted_piece ),
909                                   t->pieceCount-- );
910
911        if( t->pieceCount == 0 )
912        {
913            tr_free( t->pieces );
914            t->pieces = NULL;
915        }
916    }
917
918    assertWeightedPiecesAreSorted( t );
919}
920
921static void
922pieceListResortPiece( Torrent * t, struct weighted_piece * p )
923{
924    int pos;
925    tr_bool isSorted = TRUE;
926
927    if( p == NULL )
928        return;
929
930    /* is the torrent already sorted? */
931    pos = p - t->pieces;
932    weightTorrent = t->tor;
933    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
934        isSorted = FALSE;
935    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
936        isSorted = FALSE;
937
938    /* if it's not sorted, move it around */
939    if( !isSorted )
940    {
941        tr_bool exact;
942        const struct weighted_piece tmp = *p;
943
944        tr_removeElementFromArray( t->pieces,
945                                   pos,
946                                   sizeof( struct weighted_piece ),
947                                   t->pieceCount-- );
948
949        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
950                             sizeof( struct weighted_piece ),
951                             comparePieceByWeight, &exact );
952
953        memmove( &t->pieces[pos + 1],
954                 &t->pieces[pos],
955                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
956
957        t->pieces[pos] = tmp;
958    }
959
960    assertWeightedPiecesAreSorted( t );
961}
962
963static void
964pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
965{
966    struct weighted_piece * p;
967    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
968
969    assertWeightedPiecesAreSorted( t );
970
971    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
972    {
973        --p->requestCount;
974        pieceListResortPiece( t, p );
975    }
976
977    assertWeightedPiecesAreSorted( t );
978}
979
980/**
981***
982**/
983
984void
985tr_peerMgrRebuildRequests( tr_torrent * tor )
986{
987    assert( tr_isTorrent( tor ) );
988
989    pieceListRebuild( tor->torrentPeers );
990}
991
992void
993tr_peerMgrGetNextRequests( tr_torrent           * tor,
994                           tr_peer              * peer,
995                           int                    numwant,
996                           tr_block_index_t     * setme,
997                           int                  * numgot )
998{
999    int i;
1000    int got;
1001    Torrent * t;
1002    tr_bool endgame;
1003    struct weighted_piece * pieces;
1004    const tr_bitset * have = &peer->have;
1005
1006    /* sanity clause */
1007    assert( tr_isTorrent( tor ) );
1008    assert( peer->clientIsInterested );
1009    assert( !peer->clientIsChoked );
1010    assert( numwant > 0 );
1011
1012    /* walk through the pieces and find blocks that should be requested */
1013    got = 0;
1014    t = tor->torrentPeers;
1015    assertWeightedPiecesAreSorted( t );
1016
1017    /* prep the pieces list */
1018    if( t->pieces == NULL )
1019        pieceListRebuild( t );
1020
1021    endgame = isInEndgame( t );
1022
1023    pieces = t->pieces;
1024    for( i=0; i<t->pieceCount && got<numwant; ++i )
1025    {
1026        struct weighted_piece * p = pieces + i;
1027        const int missing = tr_cpMissingBlocksInPiece( &tor->completion, p->index );
1028        const int maxDuplicatesPerBlock = endgame ? 3 : 1;
1029
1030        if( p->requestCount > ( missing * maxDuplicatesPerBlock ) )
1031            continue;
1032
1033        /* if the peer has this piece that we want... */
1034        if( tr_bitsetHasFast( have, p->index ) )
1035        {
1036            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1037            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1038
1039            for( ; b!=e && got<numwant; ++b )
1040            {
1041                /* don't request blocks we've already got */
1042                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1043                    continue;
1044
1045                /* don't send the same request to the same peer twice */
1046                if( tr_peerMgrDidPeerRequest( tor, peer, b ) )
1047                    continue;
1048
1049                /* don't send the same request to any peer too many times */
1050                if( countBlockRequests( t, b ) >= maxDuplicatesPerBlock )
1051                    continue;
1052
1053                /* update the caller's table */
1054                setme[got++] = b;
1055
1056                /* update our own tables */
1057                requestListAdd( t, b, peer );
1058                ++p->requestCount;
1059            }
1060        }
1061    }
1062
1063    /* In most cases we've just changed the weights of a small number of pieces.
1064     * So rather than qsort()ing the entire array, it's faster to apply an
1065     * adaptive insertion sort algorithm. */
1066    if( got > 0 )
1067    {
1068        /* not enough requests || last piece modified */
1069        if ( i == t->pieceCount ) --i;
1070
1071        weightTorrent = t->tor;
1072        while( --i >= 0 )
1073        {
1074            tr_bool exact;
1075
1076            /* relative position! */
1077            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1078                                              t->pieceCount - (i + 1),
1079                                              sizeof( struct weighted_piece ),
1080                                              comparePieceByWeight, &exact );
1081            if( newpos > 0 )
1082            {
1083                const struct weighted_piece piece = t->pieces[i];
1084                memmove( &t->pieces[i],
1085                         &t->pieces[i + 1],
1086                         sizeof( struct weighted_piece ) * ( newpos ) );
1087                t->pieces[i + newpos] = piece;
1088            }
1089        }
1090    }
1091
1092    assertWeightedPiecesAreSorted( t );
1093    *numgot = got;
1094}
1095
1096tr_bool
1097tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1098                          const tr_peer     * peer,
1099                          tr_block_index_t    block )
1100{
1101    const Torrent * t = tor->torrentPeers;
1102    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1103}
1104
1105/* cancel requests that are too old */
1106static void
1107refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1108{
1109    time_t now;
1110    uint64_t now_msec;
1111    time_t too_old;
1112    tr_torrent * tor;
1113    tr_peerMgr * mgr = vmgr;
1114    managerLock( mgr );
1115
1116    now = tr_time( );
1117    now_msec = tr_date( );
1118    too_old = now - REQUEST_TTL_SECS;
1119
1120    tor = NULL;
1121    while(( tor = tr_torrentNext( mgr->session, tor )))
1122    {
1123        Torrent * t = tor->torrentPeers;
1124        const int n = t->requestCount;
1125        if( n > 0 )
1126        {
1127            int keepCount = 0;
1128            int cancelCount = 0;
1129            struct block_request * cancel = tr_new( struct block_request, n );
1130            const struct block_request * it;
1131            const struct block_request * end;
1132
1133            for( it=t->requests, end=it+n; it!=end; ++it )
1134            {
1135                if( ( it->sentAt <= too_old ) && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1136                    cancel[cancelCount++] = *it;
1137                else
1138                {
1139                    if( it != &t->requests[keepCount] )
1140                        t->requests[keepCount] = *it;
1141                    keepCount++;
1142                }
1143            }
1144
1145            /* prune out the ones we aren't keeping */
1146            t->requestCount = keepCount;
1147
1148            /* send cancel messages for all the "cancel" ones */
1149            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1150                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1151                    tr_historyAdd( it->peer->cancelsSentToPeer, now_msec, 1 );
1152                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1153                    decrementPendingReqCount( it );
1154                }
1155            }
1156
1157            /* decrement the pending request counts for the timed-out blocks */
1158            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1159                pieceListRemoveRequest( t, it->block );
1160
1161            /* cleanup loop */
1162            tr_free( cancel );
1163        }
1164    }
1165
1166    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1167    managerUnlock( mgr );
1168}
1169
1170static void
1171addStrike( Torrent * t, tr_peer * peer )
1172{
1173    tordbg( t, "increasing peer %s strike count to %d",
1174            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1175
1176    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1177    {
1178        struct peer_atom * atom = peer->atom;
1179        atom->myflags |= MYFLAG_BANNED;
1180        peer->doPurge = 1;
1181        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1182    }
1183}
1184
1185static void
1186gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1187{
1188    tr_torrent *   tor = t->tor;
1189    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1190
1191    tor->corruptCur += byteCount;
1192    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1193
1194    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1195}
1196
1197static void
1198peerSuggestedPiece( Torrent            * t UNUSED,
1199                    tr_peer            * peer UNUSED,
1200                    tr_piece_index_t     pieceIndex UNUSED,
1201                    int                  isFastAllowed UNUSED )
1202{
1203#if 0
1204    assert( t );
1205    assert( peer );
1206    assert( peer->msgs );
1207
1208    /* is this a valid piece? */
1209    if(  pieceIndex >= t->tor->info.pieceCount )
1210        return;
1211
1212    /* don't ask for it if we've already got it */
1213    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1214        return;
1215
1216    /* don't ask for it if they don't have it */
1217    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1218        return;
1219
1220    /* don't ask for it if we're choked and it's not fast */
1221    if( !isFastAllowed && peer->clientIsChoked )
1222        return;
1223
1224    /* request the blocks that we don't have in this piece */
1225    {
1226        tr_block_index_t block;
1227        const tr_torrent * tor = t->tor;
1228        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1229        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1230
1231        for( block=start; block<end; ++block )
1232        {
1233            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1234            {
1235                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1236                const uint32_t length = tr_torBlockCountBytes( tor, block );
1237                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1238                incrementPieceRequests( t, pieceIndex );
1239            }
1240        }
1241    }
1242#endif
1243}
1244
1245static void
1246removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1247{
1248    requestListRemove( t, block, peer );
1249    pieceListRemoveRequest( t, block );
1250}
1251
1252/* peer choked us, or maybe it disconnected.
1253   either way we need to remove all its requests */
1254static void
1255peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1256{
1257    int i, n;
1258    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1259
1260    for( i=n=0; i<t->requestCount; ++i )
1261        if( peer == t->requests[i].peer )
1262            blocks[n++] = t->requests[i].block;
1263
1264    for( i=0; i<n; ++i )
1265        removeRequestFromTables( t, blocks[i], peer );
1266
1267    tr_free( blocks );
1268}
1269
1270static void
1271peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1272{
1273    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1274    Torrent * t = vt;
1275    const tr_peer_event * e = vevent;
1276
1277    torrentLock( t );
1278
1279    switch( e->eventType )
1280    {
1281        case TR_PEER_UPLOAD_ONLY:
1282            /* update our atom */
1283            if( peer ) {
1284                if( e->uploadOnly ) {
1285                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1286                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1287                } else {
1288                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1289                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1290                }
1291            }
1292            break;
1293
1294        case TR_PEER_PEER_GOT_DATA:
1295        {
1296            const time_t now = tr_time( );
1297            tr_torrent * tor = t->tor;
1298
1299            tr_torrentSetActivityDate( tor, now );
1300
1301            if( e->wasPieceData ) {
1302                tor->uploadedCur += e->length;
1303                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1304                tr_torrentSetDirty( tor );
1305            }
1306
1307            /* update the stats */
1308            if( e->wasPieceData )
1309                tr_statsAddUploaded( tor->session, e->length );
1310
1311            /* update our atom */
1312            if( peer && e->wasPieceData )
1313                peer->atom->piece_data_time = now;
1314
1315            tor->needsSeedRatioCheck = TRUE;
1316
1317            break;
1318        }
1319
1320        case TR_PEER_CLIENT_GOT_REJ:
1321            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1322            break;
1323
1324        case TR_PEER_CLIENT_GOT_CHOKE:
1325            peerDeclinedAllRequests( t, peer );
1326            break;
1327
1328        case TR_PEER_CLIENT_GOT_PORT:
1329            if( peer )
1330                peer->atom->port = e->port;
1331            break;
1332
1333        case TR_PEER_CLIENT_GOT_SUGGEST:
1334            if( peer )
1335                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1336            break;
1337
1338        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1339            if( peer )
1340                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1341            break;
1342
1343        case TR_PEER_CLIENT_GOT_DATA:
1344        {
1345            const time_t now = tr_time( );
1346            tr_torrent * tor = t->tor;
1347
1348            tr_torrentSetActivityDate( tor, now );
1349
1350            if( e->wasPieceData ) {
1351                tor->downloadedCur += e->length;
1352                tr_torrentSetDirty( tor );
1353            }
1354
1355            /* update the stats */
1356            if( e->wasPieceData )
1357                tr_statsAddDownloaded( tor->session, e->length );
1358
1359            /* update our atom */
1360            if( peer && e->wasPieceData )
1361                peer->atom->piece_data_time = now;
1362
1363            break;
1364        }
1365
1366        case TR_PEER_PEER_PROGRESS:
1367        {
1368            if( peer )
1369            {
1370                struct peer_atom * atom = peer->atom;
1371                if( e->progress >= 1.0 ) {
1372                    tordbg( t, "marking peer %s as a seed",
1373                            tr_atomAddrStr( atom ) );
1374                    atom->flags |= ADDED_F_SEED_FLAG;
1375                }
1376            }
1377            break;
1378        }
1379
1380        case TR_PEER_CLIENT_GOT_BLOCK:
1381        {
1382            tr_torrent * tor = t->tor;
1383            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1384
1385            requestListRemove( t, block, peer );
1386            pieceListRemoveRequest( t, block );
1387
1388            if( peer != NULL )
1389                tr_historyAdd( peer->blocksSentToClient, tr_date( ), 1 );
1390
1391            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1392            {
1393                /* we already have this block... */
1394                const uint32_t n = tr_torBlockCountBytes( tor, block );
1395                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1396                tordbg( t, "we have this block already..." );
1397            }
1398            else
1399            {
1400                tr_cpBlockAdd( &tor->completion, block );
1401                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1402                tr_torrentSetDirty( tor );
1403
1404                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1405                {
1406                    const tr_piece_index_t p = e->pieceIndex;
1407                    const tr_bool ok = tr_ioTestPiece( tor, p );
1408
1409                    if( !ok )
1410                    {
1411                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1412                                   (unsigned long)p );
1413                    }
1414
1415                    tr_torrentSetHasPiece( tor, p, ok );
1416                    tr_torrentSetPieceChecked( tor, p, TRUE );
1417                    tr_peerMgrSetBlame( tor, p, ok );
1418
1419                    if( !ok )
1420                    {
1421                        gotBadPiece( t, p );
1422                    }
1423                    else
1424                    {
1425                        int i;
1426                        int peerCount;
1427                        tr_peer ** peers;
1428                        tr_file_index_t fileIndex;
1429
1430                        /* only add this to downloadedCur if we got it from a peer --
1431                         * webseeds shouldn't count against our ratio.  As one tracker
1432                         * admin put it, "Those pieces are downloaded directly from the
1433                         * content distributor, not the peers, it is the tracker's job
1434                         * to manage the swarms, not the web server and does not fit
1435                         * into the jurisdiction of the tracker." */
1436                        if( peer != NULL ) {
1437                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1438                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1439                        }
1440
1441                        peerCount = tr_ptrArraySize( &t->peers );
1442                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1443                        for( i=0; i<peerCount; ++i )
1444                            tr_peerMsgsHave( peers[i]->msgs, p );
1445
1446                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1447                            const tr_file * file = &tor->info.files[fileIndex];
1448                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1449                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1450                                    tr_torrentFileCompleted( tor, fileIndex );
1451                        }
1452
1453                        pieceListRemovePiece( t, p );
1454                    }
1455                }
1456
1457                t->needsCompletenessCheck = TRUE;
1458            }
1459            break;
1460        }
1461
1462        case TR_PEER_ERROR:
1463            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1464            {
1465                /* some protocol error from the peer */
1466                peer->doPurge = 1;
1467                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1468                        tr_atomAddrStr( peer->atom ) );
1469            }
1470            else
1471            {
1472                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1473            }
1474            break;
1475
1476        default:
1477            assert( 0 );
1478    }
1479
1480    torrentUnlock( t );
1481}
1482
1483static int
1484getDefaultShelfLife( uint8_t from )
1485{
1486    /* in general, peers obtained from firsthand contact
1487     * are better than those from secondhand, etc etc */
1488    switch( from )
1489    {
1490        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1491        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1492        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1493        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1494        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1495        case TR_PEER_FROM_RESUME   : return 60 * 60;
1496        default                    : return 60 * 60;
1497    }
1498}
1499
1500
1501static void
1502ensureAtomExists( Torrent           * t,
1503                  const tr_address  * addr,
1504                  const tr_port       port,
1505                  const uint8_t       flags,
1506                  const uint8_t       from )
1507{
1508    assert( tr_isAddress( addr ) );
1509    assert( from < TR_PEER_FROM__MAX );
1510
1511    if( getExistingAtom( t, addr ) == NULL )
1512    {
1513        struct peer_atom * a;
1514        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1515
1516        a = tr_new0( struct peer_atom, 1 );
1517        a->addr = *addr;
1518        a->port = port;
1519        a->flags = flags;
1520        a->from = from;
1521        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1522        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1523
1524        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1525    }
1526}
1527
1528static int
1529getMaxPeerCount( const tr_torrent * tor )
1530{
1531    return tor->maxConnectedPeers;
1532}
1533
1534static int
1535getPeerCount( const Torrent * t )
1536{
1537    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1538}
1539
1540/* FIXME: this is kind of a mess. */
1541static tr_bool
1542myHandshakeDoneCB( tr_handshake  * handshake,
1543                   tr_peerIo     * io,
1544                   tr_bool         isConnected,
1545                   const uint8_t * peer_id,
1546                   void          * vmanager )
1547{
1548    tr_bool            ok = isConnected;
1549    tr_bool            success = FALSE;
1550    tr_port            port;
1551    const tr_address * addr;
1552    tr_peerMgr       * manager = vmanager;
1553    Torrent          * t;
1554    tr_handshake     * ours;
1555
1556    assert( io );
1557    assert( tr_isBool( ok ) );
1558
1559    t = tr_peerIoHasTorrentHash( io )
1560        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1561        : NULL;
1562
1563    if( tr_peerIoIsIncoming ( io ) )
1564        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1565                                        handshake, handshakeCompare );
1566    else if( t )
1567        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1568                                        handshake, handshakeCompare );
1569    else
1570        ours = handshake;
1571
1572    assert( ours );
1573    assert( ours == handshake );
1574
1575    if( t )
1576        torrentLock( t );
1577
1578    addr = tr_peerIoGetAddress( io, &port );
1579
1580    if( !ok || !t || !t->isRunning )
1581    {
1582        if( t )
1583        {
1584            struct peer_atom * atom = getExistingAtom( t, addr );
1585            if( atom )
1586                ++atom->numFails;
1587        }
1588    }
1589    else /* looking good */
1590    {
1591        struct peer_atom * atom;
1592
1593        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1594        atom = getExistingAtom( t, addr );
1595        atom->time = tr_time( );
1596        atom->piece_data_time = 0;
1597
1598        if( atom->myflags & MYFLAG_BANNED )
1599        {
1600            tordbg( t, "banned peer %s tried to reconnect",
1601                    tr_atomAddrStr( atom ) );
1602        }
1603        else if( tr_peerIoIsIncoming( io )
1604               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1605
1606        {
1607        }
1608        else
1609        {
1610            tr_peer * peer = atom->peer;
1611
1612            if( peer ) /* we already have this peer */
1613            {
1614            }
1615            else
1616            {
1617                peer = getPeer( t, atom );
1618                tr_free( peer->client );
1619
1620                if( !peer_id )
1621                    peer->client = NULL;
1622                else {
1623                    char client[128];
1624                    tr_clientForId( client, sizeof( client ), peer_id );
1625                    peer->client = tr_strdup( client );
1626                }
1627
1628                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1629                                                                balanced by our unref in peerDestructor()  */
1630                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1631                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1632
1633                success = TRUE;
1634            }
1635        }
1636    }
1637
1638    if( t )
1639        torrentUnlock( t );
1640
1641    return success;
1642}
1643
1644void
1645tr_peerMgrAddIncoming( tr_peerMgr * manager,
1646                       tr_address * addr,
1647                       tr_port      port,
1648                       int          socket )
1649{
1650    tr_session * session;
1651
1652    managerLock( manager );
1653
1654    assert( tr_isSession( manager->session ) );
1655    session = manager->session;
1656
1657    if( tr_sessionIsAddressBlocked( session, addr ) )
1658    {
1659        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1660        tr_netClose( session, socket );
1661    }
1662    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1663    {
1664        tr_netClose( session, socket );
1665    }
1666    else /* we don't have a connection to them yet... */
1667    {
1668        tr_peerIo *    io;
1669        tr_handshake * handshake;
1670
1671        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1672
1673        handshake = tr_handshakeNew( io,
1674                                     session->encryptionMode,
1675                                     myHandshakeDoneCB,
1676                                     manager );
1677
1678        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1679
1680        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1681                                 handshakeCompare );
1682    }
1683
1684    managerUnlock( manager );
1685}
1686
1687static tr_bool
1688tr_isPex( const tr_pex * pex )
1689{
1690    return pex && tr_isAddress( &pex->addr );
1691}
1692
1693void
1694tr_peerMgrAddPex( tr_torrent   *  tor,
1695                  uint8_t         from,
1696                  const tr_pex *  pex )
1697{
1698    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1699    {
1700        Torrent * t = tor->torrentPeers;
1701        managerLock( t->manager );
1702
1703        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1704            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1705                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1706
1707        managerUnlock( t->manager );
1708    }
1709}
1710
1711tr_pex *
1712tr_peerMgrCompactToPex( const void *    compact,
1713                        size_t          compactLen,
1714                        const uint8_t * added_f,
1715                        size_t          added_f_len,
1716                        size_t *        pexCount )
1717{
1718    size_t          i;
1719    size_t          n = compactLen / 6;
1720    const uint8_t * walk = compact;
1721    tr_pex *        pex = tr_new0( tr_pex, n );
1722
1723    for( i = 0; i < n; ++i )
1724    {
1725        pex[i].addr.type = TR_AF_INET;
1726        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1727        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1728        if( added_f && ( n == added_f_len ) )
1729            pex[i].flags = added_f[i];
1730    }
1731
1732    *pexCount = n;
1733    return pex;
1734}
1735
1736tr_pex *
1737tr_peerMgrCompact6ToPex( const void    * compact,
1738                         size_t          compactLen,
1739                         const uint8_t * added_f,
1740                         size_t          added_f_len,
1741                         size_t        * pexCount )
1742{
1743    size_t          i;
1744    size_t          n = compactLen / 18;
1745    const uint8_t * walk = compact;
1746    tr_pex *        pex = tr_new0( tr_pex, n );
1747
1748    for( i = 0; i < n; ++i )
1749    {
1750        pex[i].addr.type = TR_AF_INET6;
1751        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1752        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1753        if( added_f && ( n == added_f_len ) )
1754            pex[i].flags = added_f[i];
1755    }
1756
1757    *pexCount = n;
1758    return pex;
1759}
1760
1761tr_pex *
1762tr_peerMgrArrayToPex( const void * array,
1763                      size_t       arrayLen,
1764                      size_t      * pexCount )
1765{
1766    size_t          i;
1767    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1768    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1769    const uint8_t * walk = array;
1770    tr_pex        * pex = tr_new0( tr_pex, n );
1771
1772    for( i = 0 ; i < n ; i++ ) {
1773        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1774        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1775        pex[i].flags = 0x00;
1776        walk += sizeof( tr_address ) + 2;
1777    }
1778
1779    *pexCount = n;
1780    return pex;
1781}
1782
1783/**
1784***
1785**/
1786
1787void
1788tr_peerMgrSetBlame( tr_torrent     * tor,
1789                    tr_piece_index_t pieceIndex,
1790                    int              success )
1791{
1792    if( !success )
1793    {
1794        int        peerCount, i;
1795        Torrent *  t = tor->torrentPeers;
1796        tr_peer ** peers;
1797
1798        assert( torrentIsLocked( t ) );
1799
1800        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1801        for( i = 0; i < peerCount; ++i )
1802        {
1803            tr_peer * peer = peers[i];
1804            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1805            {
1806                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1807                        tr_atomAddrStr( peer->atom ),
1808                        pieceIndex, (int)peer->strikes + 1 );
1809                addStrike( t, peer );
1810            }
1811        }
1812    }
1813}
1814
1815int
1816tr_pexCompare( const void * va, const void * vb )
1817{
1818    const tr_pex * a = va;
1819    const tr_pex * b = vb;
1820    int i;
1821
1822    assert( tr_isPex( a ) );
1823    assert( tr_isPex( b ) );
1824
1825    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1826        return i;
1827
1828    if( a->port != b->port )
1829        return a->port < b->port ? -1 : 1;
1830
1831    return 0;
1832}
1833
1834#if 0
1835static int
1836peerPrefersCrypto( const tr_peer * peer )
1837{
1838    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1839        return TRUE;
1840
1841    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1842        return FALSE;
1843
1844    return tr_peerIoIsEncrypted( peer->io );
1845}
1846#endif
1847
1848/* better goes first */
1849static int
1850compareAtomsByUsefulness( const void * va, const void *vb )
1851{
1852    const struct peer_atom * a = * (const struct peer_atom**) va;
1853    const struct peer_atom * b = * (const struct peer_atom**) vb;
1854
1855    assert( tr_isAtom( a ) );
1856    assert( tr_isAtom( b ) );
1857
1858    if( a->piece_data_time != b->piece_data_time )
1859        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1860    if( a->from != b->from )
1861        return a->from < b->from ? -1 : 1;
1862    if( a->numFails != b->numFails )
1863        return a->numFails < b->numFails ? -1 : 1;
1864
1865    return 0;
1866}
1867
1868int
1869tr_peerMgrGetPeers( tr_torrent   * tor,
1870                    tr_pex      ** setme_pex,
1871                    uint8_t        af,
1872                    uint8_t        list_mode,
1873                    int            maxCount )
1874{
1875    int i;
1876    int n;
1877    int count = 0;
1878    int atomCount = 0;
1879    const Torrent * t = tor->torrentPeers;
1880    struct peer_atom ** atoms = NULL;
1881    tr_pex * pex;
1882    tr_pex * walk;
1883
1884    assert( tr_isTorrent( tor ) );
1885    assert( setme_pex != NULL );
1886    assert( af==TR_AF_INET || af==TR_AF_INET6 );
1887    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1888
1889    managerLock( t->manager );
1890
1891    /**
1892    ***  build a list of atoms
1893    **/
1894
1895    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1896    {
1897        int i;
1898        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1899        atomCount = tr_ptrArraySize( &t->peers );
1900        atoms = tr_new( struct peer_atom *, atomCount );
1901        for( i=0; i<atomCount; ++i )
1902            atoms[i] = peers[i]->atom;
1903    }
1904    else /* TR_PEERS_ALL */
1905    {
1906        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1907        atomCount = tr_ptrArraySize( &t->pool );
1908        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1909    }
1910
1911    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1912
1913    /**
1914    ***  add the first N of them into our return list
1915    **/
1916
1917    n = MIN( atomCount, maxCount );
1918    pex = walk = tr_new0( tr_pex, n );
1919
1920    for( i=0; i<atomCount && count<n; ++i )
1921    {
1922        const struct peer_atom * atom = atoms[i];
1923        if( atom->addr.type == af )
1924        {
1925            assert( tr_isAddress( &atom->addr ) );
1926            walk->addr = atom->addr;
1927            walk->port = atom->port;
1928            walk->flags = atom->flags;
1929            ++count;
1930            ++walk;
1931        }
1932    }
1933
1934    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1935
1936    assert( ( walk - pex ) == count );
1937    *setme_pex = pex;
1938
1939    /* cleanup */
1940    tr_free( atoms );
1941    managerUnlock( t->manager );
1942    return count;
1943}
1944
1945static void atomPulse      ( int, short, void * );
1946static void bandwidthPulse ( int, short, void * );
1947static void rechokePulse   ( int, short, void * );
1948static void reconnectPulse ( int, short, void * );
1949
1950static struct event *
1951createTimer( int msec, void (*callback)(int, short, void *), void * cbdata )
1952{
1953    struct event * timer = tr_new0( struct event, 1 );
1954    evtimer_set( timer, callback, cbdata );
1955    tr_timerAddMsec( timer, msec );
1956    return timer;
1957}
1958
1959static void
1960ensureMgrTimersExist( struct tr_peerMgr * m )
1961{
1962    if( m->atomTimer == NULL )
1963        m->atomTimer = createTimer( ATOM_PERIOD_MSEC, atomPulse, m );
1964
1965    if( m->bandwidthTimer == NULL )
1966        m->bandwidthTimer = createTimer( BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
1967
1968    if( m->rechokeTimer == NULL )
1969        m->rechokeTimer = createTimer( RECHOKE_PERIOD_MSEC, rechokePulse, m );
1970
1971    if( m->reconnectTimer == NULL )
1972        m->reconnectTimer = createTimer( RECONNECT_PERIOD_MSEC, reconnectPulse, m );
1973
1974    if( m->refillUpkeepTimer == NULL )
1975        m->refillUpkeepTimer = createTimer( REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
1976}
1977
1978void
1979tr_peerMgrStartTorrent( tr_torrent * tor )
1980{
1981    Torrent * t = tor->torrentPeers;
1982
1983    assert( t != NULL );
1984    managerLock( t->manager );
1985    ensureMgrTimersExist( t->manager );
1986
1987    t->isRunning = TRUE;
1988
1989    rechokePulse( 0, 0, t->manager );
1990    managerUnlock( t->manager );
1991}
1992
1993static void
1994stopTorrent( Torrent * t )
1995{
1996    int i, n;
1997
1998    assert( torrentIsLocked( t ) );
1999
2000    t->isRunning = FALSE;
2001
2002    /* disconnect the peers. */
2003    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
2004        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
2005    tr_ptrArrayClear( &t->peers );
2006
2007    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
2008     * which removes the handshake from t->outgoingHandshakes... */
2009    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2010        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2011}
2012
2013void
2014tr_peerMgrStopTorrent( tr_torrent * tor )
2015{
2016    Torrent * t = tor->torrentPeers;
2017
2018    managerLock( t->manager );
2019
2020    stopTorrent( t );
2021
2022    managerUnlock( t->manager );
2023}
2024
2025void
2026tr_peerMgrAddTorrent( tr_peerMgr * manager,
2027                      tr_torrent * tor )
2028{
2029    managerLock( manager );
2030
2031    assert( tor );
2032    assert( tor->torrentPeers == NULL );
2033
2034    tor->torrentPeers = torrentConstructor( manager, tor );
2035
2036    managerUnlock( manager );
2037}
2038
2039void
2040tr_peerMgrRemoveTorrent( tr_torrent * tor )
2041{
2042    tr_torrentLock( tor );
2043
2044    stopTorrent( tor->torrentPeers );
2045    torrentDestructor( tor->torrentPeers );
2046
2047    tr_torrentUnlock( tor );
2048}
2049
2050void
2051tr_peerMgrTorrentAvailability( const tr_torrent * tor,
2052                               int8_t           * tab,
2053                               unsigned int       tabCount )
2054{
2055    tr_piece_index_t   i;
2056    const Torrent *    t;
2057    float              interval;
2058    tr_bool            isSeed;
2059    int                peerCount;
2060    const tr_peer **   peers;
2061    tr_torrentLock( tor );
2062
2063    t = tor->torrentPeers;
2064    tor = t->tor;
2065    interval = tor->info.pieceCount / (float)tabCount;
2066    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
2067    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2068    peerCount = tr_ptrArraySize( &t->peers );
2069
2070    memset( tab, 0, tabCount );
2071
2072    for( i = 0; tor && i < tabCount; ++i )
2073    {
2074        const int piece = i * interval;
2075
2076        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2077            tab[i] = -1;
2078        else if( peerCount ) {
2079            int j;
2080            for( j = 0; j < peerCount; ++j )
2081                if( tr_bitsetHas( &peers[j]->have, i ) )
2082                    ++tab[i];
2083        }
2084    }
2085
2086    tr_torrentUnlock( tor );
2087}
2088
2089/* Returns the pieces that are available from peers */
2090tr_bitfield*
2091tr_peerMgrGetAvailable( const tr_torrent * tor )
2092{
2093    int i;
2094    int peerCount;
2095    Torrent * t = tor->torrentPeers;
2096    const tr_peer ** peers;
2097    tr_bitfield * pieces;
2098    managerLock( t->manager );
2099
2100    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2101    peerCount = tr_ptrArraySize( &t->peers );
2102    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2103    for( i=0; i<peerCount; ++i )
2104        tr_bitsetOr( pieces, &peers[i]->have );
2105
2106    managerUnlock( t->manager );
2107    return pieces;
2108}
2109
2110void
2111tr_peerMgrTorrentStats( tr_torrent       * tor,
2112                        int              * setmePeersKnown,
2113                        int              * setmePeersConnected,
2114                        int              * setmeSeedsConnected,
2115                        int              * setmeWebseedsSendingToUs,
2116                        int              * setmePeersSendingToUs,
2117                        int              * setmePeersGettingFromUs,
2118                        int              * setmePeersFrom )
2119{
2120    int i, size;
2121    const Torrent * t = tor->torrentPeers;
2122    const tr_peer ** peers;
2123    const tr_webseed ** webseeds;
2124
2125    managerLock( t->manager );
2126
2127    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2128    size = tr_ptrArraySize( &t->peers );
2129
2130    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2131    *setmePeersConnected       = 0;
2132    *setmeSeedsConnected       = 0;
2133    *setmePeersGettingFromUs   = 0;
2134    *setmePeersSendingToUs     = 0;
2135    *setmeWebseedsSendingToUs  = 0;
2136
2137    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2138        setmePeersFrom[i] = 0;
2139
2140    for( i=0; i<size; ++i )
2141    {
2142        const tr_peer * peer = peers[i];
2143        const struct peer_atom * atom = peer->atom;
2144
2145        if( peer->io == NULL ) /* not connected */
2146            continue;
2147
2148        ++*setmePeersConnected;
2149
2150        ++setmePeersFrom[atom->from];
2151
2152        if( clientIsDownloadingFrom( tor, peer ) )
2153            ++*setmePeersSendingToUs;
2154
2155        if( clientIsUploadingTo( peer ) )
2156            ++*setmePeersGettingFromUs;
2157
2158        if( atom->flags & ADDED_F_SEED_FLAG )
2159            ++*setmeSeedsConnected;
2160    }
2161
2162    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2163    size = tr_ptrArraySize( &t->webseeds );
2164    for( i=0; i<size; ++i )
2165        if( tr_webseedIsActive( webseeds[i] ) )
2166            ++*setmeWebseedsSendingToUs;
2167
2168    managerUnlock( t->manager );
2169}
2170
2171float
2172tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
2173{
2174    int i;
2175    float tmp;
2176    float ret = 0;
2177
2178    const Torrent * t = tor->torrentPeers;
2179    const int n = tr_ptrArraySize( &t->webseeds );
2180    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2181
2182    for( i=0; i<n; ++i )
2183        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
2184            ret += tmp;
2185
2186    return ret;
2187}
2188
2189
2190float*
2191tr_peerMgrWebSpeeds( const tr_torrent * tor )
2192{
2193    const Torrent * t = tor->torrentPeers;
2194    const tr_webseed ** webseeds;
2195    int i;
2196    int webseedCount;
2197    float * ret;
2198    uint64_t now;
2199
2200    assert( t->manager );
2201    managerLock( t->manager );
2202
2203    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2204    webseedCount = tr_ptrArraySize( &t->webseeds );
2205    assert( webseedCount == tor->info.webseedCount );
2206    ret = tr_new0( float, webseedCount );
2207    now = tr_date( );
2208
2209    for( i=0; i<webseedCount; ++i )
2210        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
2211            ret[i] = -1.0;
2212
2213    managerUnlock( t->manager );
2214    return ret;
2215}
2216
2217double
2218tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
2219{
2220    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
2221}
2222
2223
2224struct tr_peer_stat *
2225tr_peerMgrPeerStats( const tr_torrent    * tor,
2226                     int                 * setmeCount )
2227{
2228    int i, size;
2229    const Torrent * t = tor->torrentPeers;
2230    const tr_peer ** peers;
2231    tr_peer_stat * ret;
2232    uint64_t now;
2233
2234    assert( t->manager );
2235    managerLock( t->manager );
2236
2237    size = tr_ptrArraySize( &t->peers );
2238    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2239    ret = tr_new0( tr_peer_stat, size );
2240    now = tr_date( );
2241
2242    for( i=0; i<size; ++i )
2243    {
2244        char *                   pch;
2245        const tr_peer *          peer = peers[i];
2246        const struct peer_atom * atom = peer->atom;
2247        tr_peer_stat *           stat = ret + i;
2248
2249        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2250        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2251                   sizeof( stat->client ) );
2252        stat->port                = ntohs( peer->atom->port );
2253        stat->from                = atom->from;
2254        stat->progress            = peer->progress;
2255        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2256        stat->rateToPeer          = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
2257        stat->rateToClient        = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
2258        stat->peerIsChoked        = peer->peerIsChoked;
2259        stat->peerIsInterested    = peer->peerIsInterested;
2260        stat->clientIsChoked      = peer->clientIsChoked;
2261        stat->clientIsInterested  = peer->clientIsInterested;
2262        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2263        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2264        stat->isUploadingTo       = clientIsUploadingTo( peer );
2265        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2266
2267        stat->blocksToPeer        = tr_historyGet( peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC*1000 );
2268        stat->blocksToClient      = tr_historyGet( peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC*1000 );
2269        stat->cancelsToPeer       = tr_historyGet( peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC*1000 );
2270        stat->cancelsToClient     = tr_historyGet( peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC*1000 );
2271
2272        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2273        stat->pendingReqsToClient = peer->pendingReqsToClient;
2274
2275        pch = stat->flagStr;
2276        if( t->optimistic == peer ) *pch++ = 'O';
2277        if( stat->isDownloadingFrom ) *pch++ = 'D';
2278        else if( stat->clientIsInterested ) *pch++ = 'd';
2279        if( stat->isUploadingTo ) *pch++ = 'U';
2280        else if( stat->peerIsInterested ) *pch++ = 'u';
2281        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2282        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2283        if( stat->isEncrypted ) *pch++ = 'E';
2284        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2285        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2286        if( stat->isIncoming ) *pch++ = 'I';
2287        *pch = '\0';
2288    }
2289
2290    *setmeCount = size;
2291
2292    managerUnlock( t->manager );
2293    return ret;
2294}
2295
2296/**
2297***
2298**/
2299
2300/* do we still want this piece and does the peer have it? */
2301static tr_bool
2302isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2303{
2304    return ( !tor->info.pieces[index].dnd ) /* we want it */
2305        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2306        && ( tr_bitsetHas( &peer->have, index ) ); /* peer has it */
2307}
2308
2309/* does this peer have any pieces that we want? */
2310static tr_bool
2311isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2312{
2313    tr_piece_index_t i, n;
2314
2315    if ( tr_torrentIsSeed( tor ) )
2316        return FALSE;
2317
2318    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2319        return FALSE;
2320
2321    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2322        if( isPieceInteresting( tor, peer, i ) )
2323            return TRUE;
2324
2325    return FALSE;
2326}
2327
2328/* determines who we send "interested" messages to */
2329static void
2330rechokeDownloads( Torrent * t )
2331{
2332    int i;
2333    const uint64_t now = tr_date( );
2334    const int msec = 60 * 1000;
2335    const int MIN_INTERESTING_PEERS = 5;
2336    const int peerCount = tr_ptrArraySize( &t->peers );
2337    int maxPeers;
2338
2339    int badCount         = 0;
2340    int goodCount        = 0;
2341    int untestedCount    = 0;
2342    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2343    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2344    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2345
2346    /* decide how many peers to be interested in */
2347    {
2348        int blocks = 0;
2349        int cancels = 0;
2350
2351        /* Count up how many blocks & cancels each peer has.
2352         *
2353         * There are two situations where we send out cancels --
2354         *
2355         * 1. We've got unresponsive peers, which is handled by deciding
2356         *    -which- peers to be interested in.
2357         *
2358         * 2. We've hit our bandwidth cap, which is handled by deciding
2359         *    -how many- peers to be interested in.
2360         *
2361         * We're working on 2. here, so we need to ignore unresponsive
2362         * peers in our calculations lest they confuse Transmission into
2363         * think it's hit its bandwidth cap.
2364         */
2365        for( i=0; i<peerCount; ++i )
2366        {
2367            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2368            const int b = tr_historyGet( peer->blocksSentToClient, now, msec );
2369            const int c = tr_historyGet( peer->cancelsSentToPeer, now, msec );
2370
2371            if( b == 0 ) /* ignore unresponsive peers, as described above */
2372                continue;
2373
2374            blocks += b;
2375            cancels += c;
2376        }
2377
2378        if( !t->interestedCount )
2379        {
2380            /* this is the torrent's first time to call this function...
2381             * start off optimistically by allowing interest in many peers */
2382            maxPeers = t->tor->maxConnectedPeers;
2383        }
2384        else if( !blocks )
2385        {
2386            /* we've gotten cancels but zero blocks...
2387             * something is seriously wrong.  throttle back sharply */
2388            maxPeers = t->interestedCount * 0.5;
2389        }
2390        else
2391        {
2392            const double c = ( cancels * 30.0 ) / blocks;
2393                 if( c > 5   ) maxPeers = t->interestedCount * 0.7;
2394            else if( c > 3   ) maxPeers = t->interestedCount * 0.8;
2395            else if( c > 1   ) maxPeers = t->interestedCount * 0.9;
2396            else if( c > 0.5 ) maxPeers = t->interestedCount;
2397            else               maxPeers = t->interestedCount + 1;
2398
2399            /* if things are getting worse, don't add more peers */
2400            if( ( t->downloadCongestion > 0.1 ) && ( c > t->downloadCongestion ) )
2401                maxPeers = MIN( maxPeers, t->interestedCount );
2402
2403            t->downloadCongestion = c;
2404/*fprintf( stderr, "OVERALL TORRENT DOWNLOAD CONGESTION: %.3f... num allowed goes from %d to %d\n", c, t->interestedCount, maxPeers );*/
2405        }
2406    }
2407
2408    /* don't let the previous paragraph's number tweaking go too far... */
2409    maxPeers = MAX( maxPeers, MIN_INTERESTING_PEERS );
2410    maxPeers = MIN( maxPeers, t->tor->maxConnectedPeers );
2411
2412    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2413     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2414     * That's the order in which we'll choose who to show interest in */
2415    for( i=0; i<peerCount; ++i )
2416    {
2417        tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2418
2419        if( !isPeerInteresting( t->tor, peer ) )
2420        {
2421            tr_peerMsgsSetInterested( peer->msgs, FALSE );
2422        }
2423        else
2424        {
2425            const int blocks = tr_historyGet( peer->blocksSentToClient, now, msec );
2426            const int cancels = tr_historyGet( peer->cancelsSentToPeer, now, msec );
2427
2428            if( !blocks && !cancels )
2429                untested[untestedCount++] = peer;
2430            else if( !cancels )
2431                good[goodCount++] = peer;
2432            else if( !blocks )
2433                bad[badCount++] = peer;
2434            else if( ( ( cancels * 10.0 ) / blocks ) < 1.0 )
2435                good[goodCount++] = peer;
2436            else
2437                bad[badCount++] = peer;
2438        }
2439    }
2440
2441    t->interestedCount = 0;
2442
2443    /* We've decided (1) how many peers to be interested in,
2444     * and (2) which peers are the best candidates,
2445     * Now it's time to update our `interest' flags. */
2446    for( i=0; i<goodCount; ++i ) {
2447        const tr_bool b = t->interestedCount < maxPeers;
2448        tr_peerMsgsSetInterested( good[i]->msgs, b );
2449        if( b )
2450            ++t->interestedCount;
2451    }
2452    for( i=0; i<untestedCount; ++i ) {
2453        const tr_bool b = t->interestedCount < maxPeers;
2454        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2455        if( b )
2456            ++t->interestedCount;
2457    }
2458    for( i=0; i<badCount; ++i ) {
2459        const tr_bool b = t->interestedCount < maxPeers;
2460        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2461        if( b )
2462            ++t->interestedCount;
2463    }
2464
2465/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2466
2467    /* cleanup */
2468    tr_free( untested );
2469    tr_free( good );
2470    tr_free( bad );
2471}
2472
2473/**
2474***
2475**/
2476
2477struct ChokeData
2478{
2479    tr_bool         doUnchoke;
2480    tr_bool         isInterested;
2481    tr_bool         isChoked;
2482    int             rate;
2483    tr_peer *       peer;
2484};
2485
2486static int
2487compareChoke( const void * va,
2488              const void * vb )
2489{
2490    const struct ChokeData * a = va;
2491    const struct ChokeData * b = vb;
2492
2493    if( a->rate != b->rate ) /* prefer higher overall speeds */
2494        return a->rate > b->rate ? -1 : 1;
2495
2496    if( a->isChoked != b->isChoked ) /* prefer unchoked */
2497        return a->isChoked ? 1 : -1;
2498
2499    return 0;
2500}
2501
2502/* is this a new connection? */
2503static int
2504isNew( const tr_peer * peer )
2505{
2506    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2507}
2508
2509static void
2510rechokeUploads( Torrent * t, const uint64_t now )
2511{
2512    int i, size, unchokedInterested;
2513    const int peerCount = tr_ptrArraySize( &t->peers );
2514    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2515    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2516    const tr_session * session = t->manager->session;
2517    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2518
2519    assert( torrentIsLocked( t ) );
2520
2521    /* sort the peers by preference and rate */
2522    for( i = 0, size = 0; i < peerCount; ++i )
2523    {
2524        tr_peer * peer = peers[i];
2525        struct peer_atom * atom = peer->atom;
2526
2527        if( peer->progress >= 1.0 ) /* choke all seeds */
2528        {
2529            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2530        }
2531        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2532        {
2533            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2534        }
2535        else if( chokeAll ) /* choke everyone if we're not uploading */
2536        {
2537            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2538        }
2539        else
2540        {
2541            struct ChokeData * n = &choke[size++];
2542            n->peer         = peer;
2543            n->isInterested = peer->peerIsInterested;
2544            n->isChoked     = peer->peerIsChoked;
2545            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2546        }
2547    }
2548
2549    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2550
2551    /**
2552     * Reciprocation and number of uploads capping is managed by unchoking
2553     * the N peers which have the best upload rate and are interested.
2554     * This maximizes the client's download rate. These N peers are
2555     * referred to as downloaders, because they are interested in downloading
2556     * from the client.
2557     *
2558     * Peers which have a better upload rate (as compared to the downloaders)
2559     * but aren't interested get unchoked. If they become interested, the
2560     * downloader with the worst upload rate gets choked. If a client has
2561     * a complete file, it uses its upload rate rather than its download
2562     * rate to decide which peers to unchoke.
2563     */
2564    unchokedInterested = 0;
2565    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2566        choke[i].doUnchoke = 1;
2567        if( choke[i].isInterested )
2568            ++unchokedInterested;
2569    }
2570
2571    /* optimistic unchoke */
2572    if( i < size )
2573    {
2574        int n;
2575        struct ChokeData * c;
2576        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2577
2578        for( ; i<size; ++i )
2579        {
2580            if( choke[i].isInterested )
2581            {
2582                const tr_peer * peer = choke[i].peer;
2583                int x = 1, y;
2584                if( isNew( peer ) ) x *= 3;
2585                for( y=0; y<x; ++y )
2586                    tr_ptrArrayAppend( &randPool, &choke[i] );
2587            }
2588        }
2589
2590        if(( n = tr_ptrArraySize( &randPool )))
2591        {
2592            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2593            c->doUnchoke = 1;
2594            t->optimistic = c->peer;
2595        }
2596
2597        tr_ptrArrayDestruct( &randPool, NULL );
2598    }
2599
2600    for( i=0; i<size; ++i )
2601        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2602
2603    /* cleanup */
2604    tr_free( choke );
2605}
2606
2607static void
2608rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
2609{
2610    uint64_t now;
2611    tr_torrent * tor = NULL;
2612    tr_peerMgr * mgr = vmgr;
2613    managerLock( mgr );
2614
2615    now = tr_date( );
2616    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2617        if( tor->isRunning ) {
2618            rechokeUploads( tor->torrentPeers, now );
2619            if( !tr_torrentIsSeed( tor ) )
2620                rechokeDownloads( tor->torrentPeers );
2621        }
2622    }
2623
2624    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
2625    managerUnlock( mgr );
2626}
2627
2628/***
2629****
2630****  Life and Death
2631****
2632***/
2633
2634typedef enum
2635{
2636    TR_CAN_KEEP,
2637    TR_CAN_CLOSE,
2638    TR_MUST_CLOSE,
2639}
2640tr_close_type_t;
2641
2642static tr_close_type_t
2643shouldPeerBeClosed( const Torrent    * t,
2644                    const tr_peer    * peer,
2645                    int                peerCount,
2646                    const time_t       now )
2647{
2648    const tr_torrent *       tor = t->tor;
2649    const struct peer_atom * atom = peer->atom;
2650
2651    /* if it's marked for purging, close it */
2652    if( peer->doPurge )
2653    {
2654        tordbg( t, "purging peer %s because its doPurge flag is set",
2655                tr_atomAddrStr( atom ) );
2656        return TR_MUST_CLOSE;
2657    }
2658
2659    /* if we're seeding and the peer has everything we have,
2660     * and enough time has passed for a pex exchange, then disconnect */
2661    if( tr_torrentIsSeed( tor ) )
2662    {
2663        int peerHasEverything;
2664        if( atom->flags & ADDED_F_SEED_FLAG )
2665            peerHasEverything = TRUE;
2666        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2667            peerHasEverything = FALSE;
2668        else {
2669            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2670            tr_bitsetDifference( tmp, &peer->have );
2671            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2672            tr_bitfieldFree( tmp );
2673        }
2674
2675        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2676        {
2677            tordbg( t, "purging peer %s because we're both seeds",
2678                    tr_atomAddrStr( atom ) );
2679            return TR_MUST_CLOSE;
2680        }
2681    }
2682
2683    /* disconnect if it's been too long since piece data has been transferred.
2684     * this is on a sliding scale based on number of available peers... */
2685    {
2686        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2687        /* if we have >= relaxIfFewerThan, strictness is 100%.
2688         * if we have zero connections, strictness is 0% */
2689        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2690                               ? 1.0
2691                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2692        const int lo = MIN_UPLOAD_IDLE_SECS;
2693        const int hi = MAX_UPLOAD_IDLE_SECS;
2694        const int limit = hi - ( ( hi - lo ) * strictness );
2695        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2696/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2697        if( idleTime > limit ) {
2698            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2699                       tr_atomAddrStr( atom ), idleTime );
2700            return TR_CAN_CLOSE;
2701        }
2702    }
2703
2704    return TR_CAN_KEEP;
2705}
2706
2707static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2708
2709static tr_peer **
2710getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2711{
2712    int i, peerCount, outsize;
2713    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2714    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2715
2716    assert( torrentIsLocked( t ) );
2717
2718    for( i = outsize = 0; i < peerCount; ++i )
2719        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2720            ret[outsize++] = peers[i];
2721
2722    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2723
2724    *setmeSize = outsize;
2725    return ret;
2726}
2727
2728static int
2729compareCandidates( const void * va, const void * vb )
2730{
2731    const struct peer_atom * a = *(const struct peer_atom**) va;
2732    const struct peer_atom * b = *(const struct peer_atom**) vb;
2733
2734    /* <Charles> Here we would probably want to try reconnecting to
2735     * peers that had most recently given us data. Lots of users have
2736     * trouble with resets due to their routers and/or ISPs. This way we
2737     * can quickly recover from an unwanted reset. So we sort
2738     * piece_data_time in descending order.
2739     */
2740
2741    if( a->piece_data_time != b->piece_data_time )
2742        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2743
2744    if( a->numFails != b->numFails )
2745        return a->numFails < b->numFails ? -1 : 1;
2746
2747    if( a->time != b->time )
2748        return a->time < b->time ? -1 : 1;
2749
2750    /* In order to avoid fragmenting the swarm, peers from trackers and
2751     * from the DHT should be preferred to peers from PEX. */
2752    if( a->from != b->from )
2753        return a->from < b->from ? -1 : 1;
2754
2755    return 0;
2756}
2757
2758static int
2759getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2760{
2761    int sec;
2762
2763    /* if we were recently connected to this peer and transferring piece
2764     * data, try to reconnect to them sooner rather that later -- we don't
2765     * want network troubles to get in the way of a good peer. */
2766    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2767        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2768
2769    /* don't allow reconnects more often than our minimum */
2770    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2771        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2772
2773    /* otherwise, the interval depends on how many times we've tried
2774     * and failed to connect to the peer */
2775    else switch( atom->numFails ) {
2776        case 0: sec = 0; break;
2777        case 1: sec = 5; break;
2778        case 2: sec = 2 * 60; break;
2779        case 3: sec = 15 * 60; break;
2780        case 4: sec = 30 * 60; break;
2781        case 5: sec = 60 * 60; break;
2782        default: sec = 120 * 60; break;
2783    }
2784
2785    return sec;
2786}
2787
2788static struct peer_atom **
2789getPeerCandidates( Torrent * t, const time_t now, int * setmeSize )
2790{
2791    int                 i, atomCount, retCount;
2792    struct peer_atom ** atoms;
2793    struct peer_atom ** ret;
2794    const int           seed = tr_torrentIsSeed( t->tor );
2795
2796    assert( torrentIsLocked( t ) );
2797
2798    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2799    ret = tr_new( struct peer_atom*, atomCount );
2800    for( i = retCount = 0; i < atomCount; ++i )
2801    {
2802        int                interval;
2803        struct peer_atom * atom = atoms[i];
2804
2805        /* peer fed us too much bad data ... we only keep it around
2806         * now to weed it out in case someone sends it to us via pex */
2807        if( atom->myflags & MYFLAG_BANNED )
2808            continue;
2809
2810        /* peer was unconnectable before, so we're not going to keep trying.
2811         * this is needs a separate flag from `banned', since if they try
2812         * to connect to us later, we'll let them in */
2813        if( atom->myflags & MYFLAG_UNREACHABLE )
2814            continue;
2815
2816        /* no need to connect if we're both seeds... */
2817        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2818                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2819            continue;
2820
2821        /* don't reconnect too often */
2822        interval = getReconnectIntervalSecs( atom, now );
2823        if( ( now - atom->time ) < interval )
2824        {
2825            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2826                    i, tr_atomAddrStr( atom ), interval );
2827            continue;
2828        }
2829
2830        /* Don't connect to peers in our blocklist */
2831        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2832            continue;
2833
2834        /* we don't need two connections to the same peer... */
2835        if( peerIsInUse( t, atom ) )
2836            continue;
2837
2838        ret[retCount++] = atom;
2839    }
2840
2841    if( retCount != 0 )
2842        qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2843    *setmeSize = retCount;
2844    return ret;
2845}
2846
2847static void
2848closePeer( Torrent * t, tr_peer * peer )
2849{
2850    struct peer_atom * atom;
2851
2852    assert( t != NULL );
2853    assert( peer != NULL );
2854
2855    atom = peer->atom;
2856
2857    /* if we transferred piece data, then they might be good peers,
2858       so reset their `numFails' weight to zero.  otherwise we connected
2859       to them fruitlessly, so mark it as another fail */
2860    if( atom->piece_data_time )
2861        atom->numFails = 0;
2862    else
2863        ++atom->numFails;
2864
2865    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2866    removePeer( t, peer );
2867}
2868
2869/* Make a lot of slots available to newly-running torrents...
2870 * once they reach steady state, they shouldn't need as many */
2871static int
2872maxNewPeersPerPulse( const Torrent * t )
2873{
2874    double runTime;
2875    const tr_torrent * tor = t->tor;
2876
2877    assert( tr_isTorrent( tor ) );
2878
2879    runTime = difftime( tr_time( ), tor->startDate );
2880    if( runTime > 480 ) return 1;
2881    if( runTime > 240 ) return 2;
2882    if( runTime > 120 ) return 3;
2883    return 4;
2884}
2885
2886static void
2887reconnectTorrent( Torrent * t )
2888{
2889    static time_t prevTime = 0;
2890    static int    newConnectionsThisSecond = 0;
2891    const time_t  now = tr_time( );
2892
2893    if( prevTime != now )
2894    {
2895        prevTime = now;
2896        newConnectionsThisSecond = 0;
2897    }
2898
2899    if( !t->isRunning )
2900    {
2901        removeAllPeers( t );
2902    }
2903    else
2904    {
2905        int i;
2906        int mustCloseCount;
2907        int maxCandidates;
2908        struct tr_peer ** mustClose;
2909        const int maxPerPulse = maxNewPeersPerPulse( t );
2910
2911        /* disconnect the really bad peers */
2912        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2913        for( i=0; i<mustCloseCount; ++i )
2914            closePeer( t, mustClose[i] );
2915        tr_free( mustClose );
2916
2917        /* decide how many peers can we try to add in this pass */
2918        maxCandidates = maxPerPulse;
2919        if( tr_announcerHasBacklog( t->manager->session->announcer ) )
2920            maxCandidates /= 2;
2921        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2922        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2923
2924        /* select the best candidates, if they are requested */
2925        if( maxCandidates == 0 )
2926        {
2927            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2928                       "NO connection candidates needed, %d atoms, "
2929                       "max per pulse is %d",
2930                       t->tor->info.name, mustCloseCount,
2931                       tr_ptrArraySize( &t->pool ),
2932                       maxPerPulse );
2933
2934            tordbg( t, "maxCandidates is %d, maxPerPulse is %d, "
2935                       "getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2936                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2937                       maxCandidates, maxPerPulse,
2938                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2939                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2940        }
2941        else
2942        {
2943            int canCloseCount = 0;
2944            int candidateCount;
2945            struct peer_atom ** candidates;
2946
2947            candidates = getPeerCandidates( t, now, &candidateCount );
2948            maxCandidates = MIN( maxCandidates, candidateCount );
2949
2950            /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2951            if( maxCandidates != 0 )
2952            {
2953                struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, now, &canCloseCount );
2954                for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2955                   closePeer( t, canClose[i] );
2956                tr_free( canClose );
2957            }
2958
2959            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2960                       "%d can-close connections, %d connection candidates, "
2961                       "%d atoms, max per pulse is %d",
2962                       t->tor->info.name, mustCloseCount,
2963                       canCloseCount, candidateCount,
2964                       tr_ptrArraySize( &t->pool ), maxPerPulse );
2965
2966            tordbg( t, "candidateCount is %d, maxPerPulse is %d,"
2967                       " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2968                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2969                       candidateCount, maxPerPulse,
2970                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2971                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2972
2973            /* add some new ones */
2974            for( i=0; i<maxCandidates; ++i )
2975            {
2976                tr_peerMgr        * mgr = t->manager;
2977                struct peer_atom  * atom = candidates[i];
2978                tr_peerIo         * io;
2979
2980                tordbg( t, "Starting an OUTGOING connection with %s",
2981                        tr_atomAddrStr( atom ) );
2982
2983                io = tr_peerIoNewOutgoing( mgr->session,
2984                                           mgr->session->bandwidth,
2985                                           &atom->addr,
2986                                           atom->port,
2987                                           t->tor->info.hash,
2988                                           t->tor->completeness == TR_SEED );
2989
2990                if( io == NULL )
2991                {
2992                    tordbg( t, "peerIo not created; marking peer %s as unreachable",
2993                            tr_atomAddrStr( atom ) );
2994                    atom->myflags |= MYFLAG_UNREACHABLE;
2995                }
2996                else
2997                {
2998                    tr_handshake * handshake = tr_handshakeNew( io,
2999                                                                mgr->session->encryptionMode,
3000                                                                myHandshakeDoneCB,
3001                                                                mgr );
3002
3003                    assert( tr_peerIoGetTorrentHash( io ) );
3004
3005                    tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
3006
3007                    ++newConnectionsThisSecond;
3008
3009                    tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3010                                             handshakeCompare );
3011                }
3012
3013                atom->time = now;
3014            }
3015            tr_free( candidates );
3016        }
3017    }
3018}
3019
3020struct peer_liveliness
3021{
3022    tr_peer * peer;
3023    void * clientData;
3024    time_t pieceDataTime;
3025    time_t time;
3026    int speed;
3027    tr_bool doPurge;
3028};
3029
3030static int
3031comparePeerLiveliness( const void * va, const void * vb )
3032{
3033    const struct peer_liveliness * a = va;
3034    const struct peer_liveliness * b = vb;
3035
3036    if( a->doPurge != b->doPurge )
3037        return a->doPurge ? 1 : -1;
3038
3039    if( a->speed != b->speed ) /* faster goes first */
3040        return a->speed > b->speed ? -1 : 1;
3041
3042    /* the one to give us data more recently goes first */
3043    if( a->pieceDataTime != b->pieceDataTime )
3044        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3045
3046    /* the one we connected to most recently goes first */
3047    if( a->time != b->time )
3048        return a->time > b->time ? -1 : 1;
3049
3050    return 0;
3051}
3052
3053static int
3054comparePeerLivelinessReverse( const void * va, const void * vb )
3055{
3056    return -comparePeerLiveliness (va, vb);
3057}
3058
3059static void
3060sortPeersByLivelinessImpl( tr_peer  ** peers,
3061                           void     ** clientData,
3062                           int         n,
3063                           uint64_t    now,
3064                           int (*compare) ( const void *va, const void *vb ) )
3065{
3066    int i;
3067    struct peer_liveliness *lives, *l;
3068
3069    /* build a sortable array of peer + extra info */
3070    lives = l = tr_new0( struct peer_liveliness, n );
3071    for( i=0; i<n; ++i, ++l )
3072    {
3073        tr_peer * p = peers[i];
3074        l->peer = p;
3075        l->doPurge = p->doPurge;
3076        l->pieceDataTime = p->atom->piece_data_time;
3077        l->time = p->atom->time;
3078        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
3079                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
3080        if( clientData )
3081            l->clientData = clientData[i];
3082    }
3083
3084    /* sort 'em */
3085    assert( n == ( l - lives ) );
3086    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3087
3088    /* build the peer array */
3089    for( i=0, l=lives; i<n; ++i, ++l ) {
3090        peers[i] = l->peer;
3091        if( clientData )
3092            clientData[i] = l->clientData;
3093    }
3094    assert( n == ( l - lives ) );
3095
3096    /* cleanup */
3097    tr_free( lives );
3098}
3099
3100static void
3101sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3102{
3103    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3104}
3105
3106static void
3107sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3108{
3109    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
3110}
3111
3112
3113static void
3114enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3115{
3116    int n = tr_ptrArraySize( &t->peers );
3117    const int max = tr_torrentGetPeerLimit( t->tor );
3118    if( n > max )
3119    {
3120        void * base = tr_ptrArrayBase( &t->peers );
3121        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3122        sortPeersByLiveliness( peers, NULL, n, now );
3123        while( n > max )
3124            closePeer( t, peers[--n] );
3125        tr_free( peers );
3126    }
3127}
3128
3129static void
3130enforceSessionPeerLimit( tr_session * session, uint64_t now )
3131{
3132    int n = 0;
3133    tr_torrent * tor = NULL;
3134    const int max = tr_sessionGetPeerLimit( session );
3135
3136    /* count the total number of peers */
3137    while(( tor = tr_torrentNext( session, tor )))
3138        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3139
3140    /* if there are too many, prune out the worst */
3141    if( n > max )
3142    {
3143        tr_peer ** peers = tr_new( tr_peer*, n );
3144        Torrent ** torrents = tr_new( Torrent*, n );
3145
3146        /* populate the peer array */
3147        n = 0;
3148        tor = NULL;
3149        while(( tor = tr_torrentNext( session, tor ))) {
3150            int i;
3151            Torrent * t = tor->torrentPeers;
3152            const int tn = tr_ptrArraySize( &t->peers );
3153            for( i=0; i<tn; ++i, ++n ) {
3154                peers[n] = tr_ptrArrayNth( &t->peers, i );
3155                torrents[n] = t;
3156            }
3157        }
3158
3159        /* sort 'em */
3160        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3161
3162        /* cull out the crappiest */
3163        while( n-- > max )
3164            closePeer( torrents[n], peers[n] );
3165
3166        /* cleanup */
3167        tr_free( torrents );
3168        tr_free( peers );
3169    }
3170}
3171
3172struct reconnectTorrentStruct
3173{
3174    tr_torrent * torrent;
3175    int salt;
3176};
3177
3178static int
3179compareReconnectTorrents( const void * va, const void * vb )
3180{
3181    int ai, bi;
3182    const struct reconnectTorrentStruct * a = va;
3183    const struct reconnectTorrentStruct * b = vb;
3184
3185    /* primary key: higher priority goes first */
3186    ai = tr_torrentGetPriority( a->torrent );
3187    bi = tr_torrentGetPriority( b->torrent );
3188    if( ai != bi )
3189        return ai > bi ? -1 : 1;
3190
3191    /* secondary key: since users tend to stare at the screens
3192     * watching their downloads' progress, give downloads a
3193     * first shot at attempting outbound peer connections. */
3194    ai = tr_torrentIsSeed( a->torrent );
3195    bi = tr_torrentIsSeed( b->torrent );
3196    if( ai != bi )
3197        return bi ? -1 : 1;
3198
3199    /* tertiary key: random */
3200    if( a->salt != b->salt )
3201        return a->salt - b->salt;
3202
3203    return 0;
3204}
3205
3206static void
3207reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3208{
3209    tr_torrent * tor;
3210    tr_peerMgr * mgr = vmgr;
3211    struct reconnectTorrentStruct * torrents;
3212    int torrentCount;
3213    int i;
3214    uint64_t now;
3215    managerLock( mgr );
3216
3217    now = tr_date( );
3218
3219    /**
3220    ***  enforce the per-session and per-torrent peer limits
3221    **/
3222
3223    /* if we're over the per-torrent peer limits, cull some peers */
3224    tor = NULL;
3225    while(( tor = tr_torrentNext( mgr->session, tor )))
3226        if( tor->isRunning )
3227            enforceTorrentPeerLimit( tor->torrentPeers, now );
3228
3229    /* if we're over the per-session peer limits, cull some peers */
3230    enforceSessionPeerLimit( mgr->session, now );
3231
3232    /**
3233    ***  try to make new peer connections
3234    **/
3235
3236    torrentCount = 0;
3237    torrents = tr_new( struct reconnectTorrentStruct,
3238                       mgr->session->torrentCount );
3239    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3240        if( tor->isRunning ) {
3241            struct reconnectTorrentStruct * r = torrents + torrentCount++;
3242            r->torrent = tor;
3243            r->salt = tr_cryptoWeakRandInt( 1024 );
3244        }
3245    }
3246    qsort( torrents,
3247           torrentCount, sizeof( struct reconnectTorrentStruct ),
3248           compareReconnectTorrents );
3249    for( i=0; i<torrentCount; ++i )
3250        reconnectTorrent( torrents[i].torrent->torrentPeers );
3251
3252
3253    /* cleanup */
3254    tr_free( torrents );
3255    tr_timerAddMsec( mgr->reconnectTimer, RECONNECT_PERIOD_MSEC );
3256    managerUnlock( mgr );
3257}
3258
3259/****
3260*****
3261*****  BANDWIDTH ALLOCATION
3262*****
3263****/
3264
3265static void
3266pumpAllPeers( tr_peerMgr * mgr )
3267{
3268    tr_torrent * tor = NULL;
3269
3270    while(( tor = tr_torrentNext( mgr->session, tor )))
3271    {
3272        int j;
3273        Torrent * t = tor->torrentPeers;
3274
3275        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3276        {
3277            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3278            tr_peerMsgsPulse( peer->msgs );
3279        }
3280    }
3281}
3282
3283static void
3284bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3285{
3286    tr_torrent * tor;
3287    tr_peerMgr * mgr = vmgr;
3288    managerLock( mgr );
3289
3290    /* FIXME: this next line probably isn't necessary... */
3291    pumpAllPeers( mgr );
3292
3293    /* allocate bandwidth to the peers */
3294    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3295    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3296
3297    /* possibly stop torrents that have seeded enough */
3298    tor = NULL;
3299    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3300        if( tor->needsSeedRatioCheck ) {
3301            tor->needsSeedRatioCheck = FALSE;
3302            tr_torrentCheckSeedRatio( tor );
3303        }
3304    }
3305
3306    /* run the completeness check for any torrents that need it */
3307    tor = NULL;
3308    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3309        if( tor->torrentPeers->needsCompletenessCheck ) {
3310            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3311            tr_torrentRecheckCompleteness( tor );
3312        }
3313    }
3314
3315    /* possibly stop torrents that have an error */
3316    tor = NULL;
3317    while(( tor = tr_torrentNext( mgr->session, tor )))
3318        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
3319            tr_torrentStop( tor );
3320
3321    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3322    managerUnlock( mgr );
3323}
3324
3325/***
3326****
3327***/
3328
3329static int
3330compareAtomPtrsByAddress( const void * va, const void *vb )
3331{
3332    const struct peer_atom * a = * (const struct peer_atom**) va;
3333    const struct peer_atom * b = * (const struct peer_atom**) vb;
3334
3335    assert( tr_isAtom( a ) );
3336    assert( tr_isAtom( b ) );
3337
3338    return tr_compareAddresses( &a->addr, &b->addr );
3339}
3340
3341/* best come first, worst go last */
3342static int
3343compareAtomPtrsByShelfDate( const void * va, const void *vb )
3344{
3345    time_t atime;
3346    time_t btime;
3347    const struct peer_atom * a = * (const struct peer_atom**) va;
3348    const struct peer_atom * b = * (const struct peer_atom**) vb;
3349    const int data_time_cutoff_secs = 60 * 60;
3350    const time_t tr_now = tr_time( );
3351
3352    assert( tr_isAtom( a ) );
3353    assert( tr_isAtom( b ) );
3354
3355    /* primary key: the last piece data time *if* it was within the last hour */
3356    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3357    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3358    if( atime != btime )
3359        return atime > btime ? -1 : 1;
3360
3361    /* secondary key: shelf date. */
3362    if( a->shelf_date != b->shelf_date )
3363        return a->shelf_date > b->shelf_date ? -1 : 1;
3364
3365    return 0;
3366}
3367
3368static int
3369getMaxAtomCount( const tr_torrent * tor )
3370{
3371    /* FIXME: this curve should be smoother... */
3372    const int n = tor->maxConnectedPeers;
3373    if( n >= 200 ) return n * 1.5;
3374    if( n >= 100 ) return n * 2;
3375    if( n >=  50 ) return n * 3;
3376    if( n >=  20 ) return n * 5;
3377    return n * 10;
3378}
3379
3380static void
3381atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3382{
3383    tr_torrent * tor = NULL;
3384    tr_peerMgr * mgr = vmgr;
3385    managerLock( mgr );
3386
3387    while(( tor = tr_torrentNext( mgr->session, tor )))
3388    {
3389        int atomCount;
3390        Torrent * t = tor->torrentPeers;
3391        const int maxAtomCount = getMaxAtomCount( tor );
3392        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3393
3394        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3395        {
3396            int i;
3397            int keepCount = 0;
3398            int testCount = 0;
3399            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3400            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3401
3402            /* keep the ones that are in use */
3403            for( i=0; i<atomCount; ++i ) {
3404                struct peer_atom * atom = atoms[i];
3405                if( peerIsInUse( t, atom ) )
3406                    keep[keepCount++] = atom;
3407                else
3408                    test[testCount++] = atom;
3409            }
3410
3411            /* if there's room, keep the best of what's left */
3412            i = 0;
3413            if( keepCount < maxAtomCount ) {
3414                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3415                while( i<testCount && keepCount<maxAtomCount )
3416                    keep[keepCount++] = test[i++];
3417            }
3418
3419            /* free the culled atoms */
3420            while( i<testCount )
3421                tr_free( test[i++] );
3422
3423            /* rebuild Torrent.pool with what's left */
3424            tr_ptrArrayDestruct( &t->pool, NULL );
3425            t->pool = TR_PTR_ARRAY_INIT;
3426            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3427            for( i=0; i<keepCount; ++i )
3428                tr_ptrArrayAppend( &t->pool, keep[i] );
3429
3430            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3431
3432            /* cleanup */
3433            tr_free( test );
3434            tr_free( keep );
3435        }
3436    }
3437
3438    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3439    managerUnlock( mgr );
3440}
Note: See TracBrowser for help on using the repository browser.