source: trunk/libtransmission/peer-mgr.c @ 10750

Last change on this file since 10750 was 10750, checked in by charles, 12 years ago

(trunk libT) when filtering out peers that aren't candidates, use the fastest filter tests first, so that the more expensive tests will be applied to fewer peers. In particular, call peerIsInUse() last.

  • Property svn:keywords set to Date Rev Author Id
File size: 100.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 10750 2010-06-14 11:57:46Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16
17#include <event.h>
18
19#include "transmission.h"
20#include "announcer.h"
21#include "bandwidth.h"
22#include "bencode.h"
23#include "blocklist.h"
24#include "clients.h"
25#include "completion.h"
26#include "crypto.h"
27#include "handshake.h"
28#include "inout.h" /* tr_ioTestPiece */
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "session.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "utils.h"
38#include "webseed.h"
39
40enum
41{
42    /* how frequently to cull old atoms */
43    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
44
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* how frequently to reallocate bandwidth */
49    BANDWIDTH_PERIOD_MSEC = 500,
50
51    /* how frequently to age out old piece request lists */
52    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
53
54    /* how frequently to decide which peers live and die */
55    RECONNECT_PERIOD_MSEC = 500,
56
57    /* when many peers are available, keep idle ones this long */
58    MIN_UPLOAD_IDLE_SECS = ( 60 ),
59
60    /* when few peers are available, keep idle ones this long */
61    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
62
63    /* max number of peers to ask for per second overall.
64    * this throttle is to avoid overloading the router */
65    MAX_CONNECTIONS_PER_SECOND = 8,
66
67    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 5,
71
72    /* amount of time to keep a list of request pieces lying around
73       before it's considered too old and needs to be rebuilt */
74    PIECE_LIST_SHELF_LIFE_SECS = 60,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* use for bitwise operations w/peer_atom.myflags */
80    /* unreachable for now... but not banned.
81     * if they try to connect to us it's okay */
82    MYFLAG_UNREACHABLE = 2,
83
84    /* the minimum we'll wait before attempting to reconnect to a peer */
85    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
86
87    /** how long we'll let requests we've made linger before we cancel them */
88    REQUEST_TTL_SECS = 120,
89
90    CANCEL_HISTORY_SEC = 120
91};
92
93
94/**
95***
96**/
97
98enum
99{
100    UPLOAD_ONLY_UKNOWN,
101    UPLOAD_ONLY_YES,
102    UPLOAD_ONLY_NO
103};
104
105/**
106 * Peer information that should be kept even before we've connected and
107 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
108 * which ones would make good candidates for connecting to, and to watch out
109 * for banned peers.
110 *
111 * @see tr_peer
112 * @see tr_peermsgs
113 */
114struct peer_atom
115{
116    uint8_t     from;
117    uint8_t     flags;              /* these match the added_f flags */
118    uint8_t     myflags;            /* flags that aren't defined in added_f */
119    uint8_t     uploadOnly;         /* UPLOAD_ONLY_ */
120    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
121
122    tr_port     port;
123    uint16_t    numFails;
124    time_t      time;               /* when the peer's connection status last changed */
125    time_t      piece_data_time;
126
127    time_t      lastConnectionAttemptAt;
128    time_t      lastConnectionAt;
129
130    /* similar to a TTL field, but less rigid --
131     * if the swarm is small, the atom will be kept past this date. */
132    time_t      shelf_date;
133    tr_peer   * peer;               /* will be NULL if not connected */
134    tr_address  addr;
135};
136
137#ifdef NDEBUG
138#define tr_isAtom(a) (TRUE)
139#else
140static tr_bool
141tr_isAtom( const struct peer_atom * atom )
142{
143    return ( atom != NULL )
144        && ( atom->from < TR_PEER_FROM__MAX )
145        && ( tr_isAddress( &atom->addr ) );
146}
147#endif
148
149static const char*
150tr_atomAddrStr( const struct peer_atom * atom )
151{
152    return tr_peerIoAddrStr( &atom->addr, atom->port );
153}
154
155struct block_request
156{
157    tr_block_index_t block;
158    tr_peer * peer;
159    time_t sentAt;
160};
161
162struct weighted_piece
163{
164    tr_piece_index_t index;
165    int16_t salt;
166    int16_t requestCount;
167};
168
169/** @brief Opaque, per-torrent data structure for peer connection information */
170typedef struct tr_torrent_peers
171{
172    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
173    tr_ptrArray                pool; /* struct peer_atom */
174    tr_ptrArray                peers; /* tr_peer */
175    tr_ptrArray                webseeds; /* tr_webseed */
176
177    tr_torrent               * tor;
178    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
179    struct tr_peerMgr        * manager;
180
181    tr_bool                    isRunning;
182    tr_bool                    needsCompletenessCheck;
183
184    struct block_request     * requests;
185    int                        requestCount;
186    int                        requestAlloc;
187
188    struct weighted_piece    * pieces;
189    int                        pieceCount;
190
191    int                        interestedCount;
192
193    /* An arbitrary metric of how congested the downloads are.
194     * Based on how many of requests are cancelled and how many are completed.
195     * Lower values indicate less congestion. */
196    double                     cancelRate;
197}
198Torrent;
199
200struct tr_peerMgr
201{
202    tr_session    * session;
203    tr_ptrArray     incomingHandshakes; /* tr_handshake */
204    struct event  * bandwidthTimer;
205    struct event  * rechokeTimer;
206    struct event  * refillUpkeepTimer;
207    struct event  * atomTimer;
208};
209
210#define tordbg( t, ... ) \
211    do { \
212        if( tr_deepLoggingIsActive( ) ) \
213            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
214    } while( 0 )
215
216#define dbgmsg( ... ) \
217    do { \
218        if( tr_deepLoggingIsActive( ) ) \
219            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
220    } while( 0 )
221
222/**
223***
224**/
225
226static inline void
227managerLock( const struct tr_peerMgr * manager )
228{
229    tr_sessionLock( manager->session );
230}
231
232static inline void
233managerUnlock( const struct tr_peerMgr * manager )
234{
235    tr_sessionUnlock( manager->session );
236}
237
238static inline void
239torrentLock( Torrent * torrent )
240{
241    managerLock( torrent->manager );
242}
243
244static inline void
245torrentUnlock( Torrent * torrent )
246{
247    managerUnlock( torrent->manager );
248}
249
250static inline int
251torrentIsLocked( const Torrent * t )
252{
253    return tr_sessionIsLocked( t->manager->session );
254}
255
256/**
257***
258**/
259
260static int
261handshakeCompareToAddr( const void * va, const void * vb )
262{
263    const tr_handshake * a = va;
264
265    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
266}
267
268static int
269handshakeCompare( const void * a, const void * b )
270{
271    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
272}
273
274static inline tr_handshake*
275getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
276{
277    if( tr_ptrArrayEmpty( handshakes ) )
278        return NULL;
279
280    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
281}
282
283static int
284comparePeerAtomToAddress( const void * va, const void * vb )
285{
286    const struct peer_atom * a = va;
287
288    return tr_compareAddresses( &a->addr, vb );
289}
290
291static int
292compareAtomsByAddress( const void * va, const void * vb )
293{
294    const struct peer_atom * b = vb;
295
296    assert( tr_isAtom( b ) );
297
298    return comparePeerAtomToAddress( va, &b->addr );
299}
300
301/**
302***
303**/
304
305const tr_address *
306tr_peerAddress( const tr_peer * peer )
307{
308    return &peer->atom->addr;
309}
310
311static Torrent*
312getExistingTorrent( tr_peerMgr *    manager,
313                    const uint8_t * hash )
314{
315    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
316
317    return tor == NULL ? NULL : tor->torrentPeers;
318}
319
320static int
321peerCompare( const void * a, const void * b )
322{
323    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
324}
325
326static struct peer_atom*
327getExistingAtom( const Torrent    * t,
328                 const tr_address * addr )
329{
330    Torrent * tt = (Torrent*)t;
331    assert( torrentIsLocked( t ) );
332    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
333}
334
335static tr_bool
336peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
337{
338    Torrent * t = (Torrent*) ct;
339
340    assert( torrentIsLocked ( t ) );
341
342    return ( atom->peer != NULL )
343        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
344        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
345}
346
347static tr_peer*
348peerConstructor( struct peer_atom * atom )
349{
350    tr_peer * peer = tr_new0( tr_peer, 1 );
351
352    tr_bitsetConstructor( &peer->have, 0 );
353
354    peer->atom = atom;
355    atom->peer = peer;
356
357    peer->blocksSentToClient  = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
358    peer->blocksSentToPeer    = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
359    peer->cancelsSentToClient = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
360    peer->cancelsSentToPeer   = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
361
362    return peer;
363}
364
365static tr_peer*
366getPeer( Torrent * torrent, struct peer_atom * atom )
367{
368    tr_peer * peer;
369
370    assert( torrentIsLocked( torrent ) );
371
372    peer = atom->peer;
373
374    if( peer == NULL )
375    {
376        peer = peerConstructor( atom );
377        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
378    }
379
380    return peer;
381}
382
383static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
384
385static void
386peerDestructor( Torrent * t, tr_peer * peer )
387{
388    assert( peer != NULL );
389
390    peerDeclinedAllRequests( t, peer );
391
392    if( peer->msgs != NULL )
393    {
394        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
395        tr_peerMsgsFree( peer->msgs );
396    }
397
398    tr_peerIoClear( peer->io );
399    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
400
401    tr_historyFree( peer->blocksSentToClient  );
402    tr_historyFree( peer->blocksSentToPeer    );
403    tr_historyFree( peer->cancelsSentToClient );
404    tr_historyFree( peer->cancelsSentToPeer   );
405
406    tr_bitsetDestructor( &peer->have );
407    tr_bitfieldFree( peer->blame );
408    tr_free( peer->client );
409    peer->atom->peer = NULL;
410
411    tr_free( peer );
412}
413
414static void
415removePeer( Torrent * t, tr_peer * peer )
416{
417    tr_peer * removed;
418    struct peer_atom * atom = peer->atom;
419
420    assert( torrentIsLocked( t ) );
421    assert( atom );
422
423    atom->time = tr_time( );
424
425    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
426    assert( removed == peer );
427    peerDestructor( t, removed );
428}
429
430static void
431removeAllPeers( Torrent * t )
432{
433    while( !tr_ptrArrayEmpty( &t->peers ) )
434        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
435}
436
437static void
438torrentDestructor( void * vt )
439{
440    Torrent * t = vt;
441
442    assert( t );
443    assert( !t->isRunning );
444    assert( torrentIsLocked( t ) );
445    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
446    assert( tr_ptrArrayEmpty( &t->peers ) );
447
448    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
449    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
450    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
451    tr_ptrArrayDestruct( &t->peers, NULL );
452
453    tr_free( t->requests );
454    tr_free( t->pieces );
455    tr_free( t );
456}
457
458static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
459
460static Torrent*
461torrentConstructor( tr_peerMgr * manager,
462                    tr_torrent * tor )
463{
464    int       i;
465    Torrent * t;
466
467    t = tr_new0( Torrent, 1 );
468    t->manager = manager;
469    t->tor = tor;
470    t->pool = TR_PTR_ARRAY_INIT;
471    t->peers = TR_PTR_ARRAY_INIT;
472    t->webseeds = TR_PTR_ARRAY_INIT;
473    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
474
475    for( i = 0; i < tor->info.webseedCount; ++i )
476    {
477        tr_webseed * w =
478            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
479        tr_ptrArrayAppend( &t->webseeds, w );
480    }
481
482    return t;
483}
484
485tr_peerMgr*
486tr_peerMgrNew( tr_session * session )
487{
488    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
489    m->session = session;
490    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
491    return m;
492}
493
494static void
495deleteTimer( struct event ** t )
496{
497    if( *t != NULL )
498    {
499        evtimer_del( *t );
500        tr_free( *t );
501        *t = NULL;
502    }
503}
504
505static void
506deleteTimers( struct tr_peerMgr * m )
507{
508    deleteTimer( &m->atomTimer );
509    deleteTimer( &m->bandwidthTimer );
510    deleteTimer( &m->rechokeTimer );
511    deleteTimer( &m->refillUpkeepTimer );
512}
513
514void
515tr_peerMgrFree( tr_peerMgr * manager )
516{
517    managerLock( manager );
518
519    deleteTimers( manager );
520
521    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
522     * the item from manager->handshakes, so this is a little roundabout... */
523    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
524        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
525
526    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
527
528    managerUnlock( manager );
529    tr_free( manager );
530}
531
532static int
533clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
534{
535    if( !tr_torrentHasMetadata( tor ) )
536        return TRUE;
537
538    return peer->clientIsInterested && !peer->clientIsChoked;
539}
540
541static int
542clientIsUploadingTo( const tr_peer * peer )
543{
544    return peer->peerIsInterested && !peer->peerIsChoked;
545}
546
547/***
548****
549***/
550
551static void
552atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
553{
554    assert( atom != NULL );
555    assert( -1<=seedProbability && seedProbability<=100 );
556
557    atom->seedProbability = seedProbability;
558
559    if( seedProbability == 100 )
560        atom->flags |= ADDED_F_SEED_FLAG;
561    else if( seedProbability != -1 )
562        atom->flags &= ~ADDED_F_SEED_FLAG;
563}
564
565static void
566atomSetSeed( struct peer_atom * atom )
567{
568    atomSetSeedProbability( atom, 100 );
569}
570
571static inline tr_bool
572atomIsSeed( const struct peer_atom * atom )
573{
574    return atom->seedProbability == 100;
575}
576
577tr_bool
578tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
579                      const tr_address  * addr )
580{
581    tr_bool isSeed = FALSE;
582    const Torrent * t = tor->torrentPeers;
583    const struct peer_atom * atom = getExistingAtom( t, addr );
584
585    if( atom )
586        isSeed = atomIsSeed( atom );
587
588    return isSeed;
589}
590
591/**
592***  REQUESTS
593***
594*** There are two data structures associated with managing block requests:
595***
596*** 1. Torrent::requests, an array of "struct block_request" which keeps
597***    track of which blocks have been requested, and when, and by which peers.
598***    This is list is used for (a) cancelling requests that have been pending
599***    for too long and (b) avoiding duplicate requests before endgame.
600***
601*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
602***    pieces that we want to request.  It's used to decide which blocks to
603***    return next when tr_peerMgrGetBlockRequests() is called.
604**/
605
606/**
607*** struct block_request
608**/
609
610static int
611compareReqByBlock( const void * va, const void * vb )
612{
613    const struct block_request * a = va;
614    const struct block_request * b = vb;
615
616    /* primary key: block */
617    if( a->block < b->block ) return -1;
618    if( a->block > b->block ) return 1;
619
620    /* secondary key: peer */
621    if( a->peer < b->peer ) return -1;
622    if( a->peer > b->peer ) return 1;
623
624    return 0;
625}
626
627static void
628requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
629{
630    struct block_request key;
631
632    /* ensure enough room is available... */
633    if( t->requestCount + 1 >= t->requestAlloc )
634    {
635        const int CHUNK_SIZE = 128;
636        t->requestAlloc += CHUNK_SIZE;
637        t->requests = tr_renew( struct block_request,
638                                t->requests, t->requestAlloc );
639    }
640
641    /* populate the record we're inserting */
642    key.block = block;
643    key.peer = peer;
644    key.sentAt = tr_time( );
645
646    /* insert the request to our array... */
647    {
648        tr_bool exact;
649        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
650                                       sizeof( struct block_request ),
651                                       compareReqByBlock, &exact );
652        assert( !exact );
653        memmove( t->requests + pos + 1,
654                 t->requests + pos,
655                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
656        t->requests[pos] = key;
657    }
658
659    if( peer != NULL )
660    {
661        ++peer->pendingReqsToPeer;
662        assert( peer->pendingReqsToPeer >= 0 );
663    }
664
665    /*fprintf( stderr, "added request of block %lu from peer %s... "
666                       "there are now %d block\n",
667                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
668}
669
670static struct block_request *
671requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
672{
673    struct block_request key;
674    key.block = block;
675    key.peer = (tr_peer*) peer;
676
677    return bsearch( &key, t->requests, t->requestCount,
678                    sizeof( struct block_request ),
679                    compareReqByBlock );
680}
681
682/* how many peers are we currently requesting this block from... */
683static int
684countBlockRequests( Torrent * t, tr_block_index_t block )
685{
686    tr_bool exact;
687    int i, n, pos;
688    struct block_request key;
689
690    key.block = block;
691    key.peer = NULL;
692    pos = tr_lowerBound( &key, t->requests, t->requestCount,
693                         sizeof( struct block_request ),
694                         compareReqByBlock, &exact );
695
696    assert( !exact ); /* shouldn't have a request with .peer == NULL */
697
698    n = 0;
699    for( i=pos; i<t->requestCount; ++i ) {
700        if( t->requests[i].block == block )
701            ++n;
702        else
703            break;
704    }
705
706    return n;
707}
708
709static void
710decrementPendingReqCount( const struct block_request * b )
711{
712    if( b->peer != NULL )
713        if( b->peer->pendingReqsToPeer > 0 )
714            --b->peer->pendingReqsToPeer;
715}
716
717static void
718requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
719{
720    const struct block_request * b = requestListLookup( t, block, peer );
721    if( b != NULL )
722    {
723        const int pos = b - t->requests;
724        assert( pos < t->requestCount );
725
726        decrementPendingReqCount( b );
727
728        tr_removeElementFromArray( t->requests,
729                                   pos,
730                                   sizeof( struct block_request ),
731                                   t->requestCount-- );
732
733        /*fprintf( stderr, "removing request of block %lu from peer %s... "
734                           "there are now %d block requests left\n",
735                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
736    }
737}
738
739/**
740*** struct weighted_piece
741**/
742
743enum
744{
745    PIECES_UNSORTED,
746    PIECES_SORTED_BY_INDEX,
747    PIECES_SORTED_BY_WEIGHT
748};
749
750const tr_torrent * weightTorrent;
751
752/* we try to create a "weight" s.t. high-priority pieces come before others,
753 * and that partially-complete pieces come before empty ones. */
754static int
755comparePieceByWeight( const void * va, const void * vb )
756{
757    const struct weighted_piece * a = va;
758    const struct weighted_piece * b = vb;
759    int ia, ib, missing, pending;
760    const tr_torrent * tor = weightTorrent;
761
762    /* primary key: weight */
763    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
764    pending = a->requestCount;
765    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
766    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
767    pending = b->requestCount;
768    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
769    if( ia < ib ) return -1;
770    if( ia > ib ) return 1;
771
772    /* secondary key: higher priorities go first */
773    ia = tor->info.pieces[a->index].priority;
774    ib = tor->info.pieces[b->index].priority;
775    if( ia > ib ) return -1;
776    if( ia < ib ) return 1;
777
778    /* tertiary key: random */
779    if( a->salt < b->salt ) return -1;
780    if( a->salt > b->salt ) return 1;
781
782    /* okay, they're equal */
783    return 0;
784}
785
786static int
787comparePieceByIndex( const void * va, const void * vb )
788{
789    const struct weighted_piece * a = va;
790    const struct weighted_piece * b = vb;
791    if( a->index < b->index ) return -1;
792    if( a->index > b->index ) return 1;
793    return 0;
794}
795
796static void
797pieceListSort( Torrent * t, int mode )
798{
799    int(*compar)(const void *, const void *);
800
801    assert( mode==PIECES_SORTED_BY_INDEX
802         || mode==PIECES_SORTED_BY_WEIGHT );
803
804    switch( mode ) {
805        case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
806        case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
807        default: assert( 0 && "unhandled" );  break;
808    }
809
810    weightTorrent = t->tor;
811    qsort( t->pieces, t->pieceCount,
812           sizeof( struct weighted_piece ), compar );
813}
814
815static tr_bool
816isInEndgame( Torrent * t )
817{
818    tr_bool endgame = FALSE;
819
820    if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
821    {
822        const struct weighted_piece * p = t->pieces;
823        const int pending = p->requestCount;
824        const int missing = tr_cpMissingBlocksInPiece( &t->tor->completion, p->index );
825        endgame = pending >= missing;
826    }
827
828    /*if( endgame ) fprintf( stderr, "ENDGAME reached\n" );*/
829    return endgame;
830}
831
832/**
833 * This function is useful for sanity checking,
834 * but is too expensive even for nightly builds...
835 * let's leave it disabled but add an easy hook to compile it back in
836 */
837#if 0
838static void
839assertWeightedPiecesAreSorted( Torrent * t )
840{
841    if( !isInEndgame( t ) )
842    {
843        int i;
844        weightTorrent = t->tor;
845        for( i=0; i<t->pieceCount-1; ++i )
846            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
847    }
848}
849#else
850#define assertWeightedPiecesAreSorted(t)
851#endif
852
853static struct weighted_piece *
854pieceListLookup( Torrent * t, tr_piece_index_t index )
855{
856    int i;
857
858    for( i=0; i<t->pieceCount; ++i )
859        if( t->pieces[i].index == index )
860            return &t->pieces[i];
861
862    return NULL;
863}
864
865static void
866pieceListRebuild( Torrent * t )
867{
868    assertWeightedPiecesAreSorted( t );
869
870    if( !tr_torrentIsSeed( t->tor ) )
871    {
872        tr_piece_index_t i;
873        tr_piece_index_t * pool;
874        tr_piece_index_t poolCount = 0;
875        const tr_torrent * tor = t->tor;
876        const tr_info * inf = tr_torrentInfo( tor );
877        struct weighted_piece * pieces;
878        int pieceCount;
879
880        /* build the new list */
881        pool = tr_new( tr_piece_index_t, inf->pieceCount );
882        for( i=0; i<inf->pieceCount; ++i )
883            if( !inf->pieces[i].dnd )
884                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
885                    pool[poolCount++] = i;
886        pieceCount = poolCount;
887        pieces = tr_new0( struct weighted_piece, pieceCount );
888        for( i=0; i<poolCount; ++i ) {
889            struct weighted_piece * piece = pieces + i;
890            piece->index = pool[i];
891            piece->requestCount = 0;
892            piece->salt = tr_cryptoWeakRandInt( 4096 );
893        }
894
895        /* if we already had a list of pieces, merge it into
896         * the new list so we don't lose its requestCounts */
897        if( t->pieces != NULL )
898        {
899            struct weighted_piece * o = t->pieces;
900            struct weighted_piece * oend = o + t->pieceCount;
901            struct weighted_piece * n = pieces;
902            struct weighted_piece * nend = n + pieceCount;
903
904            pieceListSort( t, PIECES_SORTED_BY_INDEX );
905
906            while( o!=oend && n!=nend ) {
907                if( o->index < n->index )
908                    ++o;
909                else if( o->index > n->index )
910                    ++n;
911                else
912                    *n++ = *o++;
913            }
914
915            tr_free( t->pieces );
916        }
917
918        t->pieces = pieces;
919        t->pieceCount = pieceCount;
920
921        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
922
923        /* cleanup */
924        tr_free( pool );
925    }
926}
927
928static void
929pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
930{
931    struct weighted_piece * p;
932
933    assertWeightedPiecesAreSorted( t );
934
935    if(( p = pieceListLookup( t, piece )))
936    {
937        const int pos = p - t->pieces;
938
939        tr_removeElementFromArray( t->pieces,
940                                   pos,
941                                   sizeof( struct weighted_piece ),
942                                   t->pieceCount-- );
943
944        if( t->pieceCount == 0 )
945        {
946            tr_free( t->pieces );
947            t->pieces = NULL;
948        }
949    }
950
951    assertWeightedPiecesAreSorted( t );
952}
953
954static void
955pieceListResortPiece( Torrent * t, struct weighted_piece * p )
956{
957    int pos;
958    tr_bool isSorted = TRUE;
959
960    if( p == NULL )
961        return;
962
963    /* is the torrent already sorted? */
964    pos = p - t->pieces;
965    weightTorrent = t->tor;
966    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
967        isSorted = FALSE;
968    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
969        isSorted = FALSE;
970
971    /* if it's not sorted, move it around */
972    if( !isSorted )
973    {
974        tr_bool exact;
975        const struct weighted_piece tmp = *p;
976
977        tr_removeElementFromArray( t->pieces,
978                                   pos,
979                                   sizeof( struct weighted_piece ),
980                                   t->pieceCount-- );
981
982        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
983                             sizeof( struct weighted_piece ),
984                             comparePieceByWeight, &exact );
985
986        memmove( &t->pieces[pos + 1],
987                 &t->pieces[pos],
988                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
989
990        t->pieces[pos] = tmp;
991    }
992
993    assertWeightedPiecesAreSorted( t );
994}
995
996static void
997pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
998{
999    struct weighted_piece * p;
1000    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1001
1002    assertWeightedPiecesAreSorted( t );
1003
1004    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1005    {
1006        --p->requestCount;
1007        pieceListResortPiece( t, p );
1008    }
1009
1010    assertWeightedPiecesAreSorted( t );
1011}
1012
1013/**
1014***
1015**/
1016
1017void
1018tr_peerMgrRebuildRequests( tr_torrent * tor )
1019{
1020    assert( tr_isTorrent( tor ) );
1021
1022    pieceListRebuild( tor->torrentPeers );
1023}
1024
1025void
1026tr_peerMgrGetNextRequests( tr_torrent           * tor,
1027                           tr_peer              * peer,
1028                           int                    numwant,
1029                           tr_block_index_t     * setme,
1030                           int                  * numgot )
1031{
1032    int i;
1033    int got;
1034    Torrent * t;
1035    tr_bool endgame;
1036    struct weighted_piece * pieces;
1037    const tr_bitset * have = &peer->have;
1038
1039    /* sanity clause */
1040    assert( tr_isTorrent( tor ) );
1041    assert( peer->clientIsInterested );
1042    assert( !peer->clientIsChoked );
1043    assert( numwant > 0 );
1044
1045    /* walk through the pieces and find blocks that should be requested */
1046    got = 0;
1047    t = tor->torrentPeers;
1048    assertWeightedPiecesAreSorted( t );
1049
1050    /* prep the pieces list */
1051    if( t->pieces == NULL )
1052        pieceListRebuild( t );
1053
1054    endgame = isInEndgame( t );
1055
1056    pieces = t->pieces;
1057    for( i=0; i<t->pieceCount && got<numwant; ++i )
1058    {
1059        struct weighted_piece * p = pieces + i;
1060        const int missing = tr_cpMissingBlocksInPiece( &tor->completion, p->index );
1061        const int maxDuplicatesPerBlock = endgame ? 3 : 1;
1062
1063        if( p->requestCount > ( missing * maxDuplicatesPerBlock ) )
1064            continue;
1065
1066        /* if the peer has this piece that we want... */
1067        if( tr_bitsetHasFast( have, p->index ) )
1068        {
1069            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1070            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1071
1072            for( ; b!=e && got<numwant; ++b )
1073            {
1074                /* don't request blocks we've already got */
1075                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1076                    continue;
1077
1078                /* don't send the same request to the same peer twice */
1079                if( tr_peerMgrDidPeerRequest( tor, peer, b ) )
1080                    continue;
1081
1082                /* don't send the same request to any peer too many times */
1083                if( countBlockRequests( t, b ) >= maxDuplicatesPerBlock )
1084                    continue;
1085
1086                /* update the caller's table */
1087                setme[got++] = b;
1088
1089                /* update our own tables */
1090                requestListAdd( t, b, peer );
1091                ++p->requestCount;
1092            }
1093        }
1094    }
1095
1096    /* In most cases we've just changed the weights of a small number of pieces.
1097     * So rather than qsort()ing the entire array, it's faster to apply an
1098     * adaptive insertion sort algorithm. */
1099    if( got > 0 )
1100    {
1101        /* not enough requests || last piece modified */
1102        if ( i == t->pieceCount ) --i;
1103
1104        weightTorrent = t->tor;
1105        while( --i >= 0 )
1106        {
1107            tr_bool exact;
1108
1109            /* relative position! */
1110            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1111                                              t->pieceCount - (i + 1),
1112                                              sizeof( struct weighted_piece ),
1113                                              comparePieceByWeight, &exact );
1114            if( newpos > 0 )
1115            {
1116                const struct weighted_piece piece = t->pieces[i];
1117                memmove( &t->pieces[i],
1118                         &t->pieces[i + 1],
1119                         sizeof( struct weighted_piece ) * ( newpos ) );
1120                t->pieces[i + newpos] = piece;
1121            }
1122        }
1123    }
1124
1125    assertWeightedPiecesAreSorted( t );
1126    *numgot = got;
1127}
1128
1129tr_bool
1130tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1131                          const tr_peer     * peer,
1132                          tr_block_index_t    block )
1133{
1134    const Torrent * t = tor->torrentPeers;
1135    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1136}
1137
1138/* cancel requests that are too old */
1139static void
1140refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1141{
1142    time_t now;
1143    uint64_t now_msec;
1144    time_t too_old;
1145    tr_torrent * tor;
1146    tr_peerMgr * mgr = vmgr;
1147    managerLock( mgr );
1148
1149    now = tr_time( );
1150    now_msec = tr_date( );
1151    too_old = now - REQUEST_TTL_SECS;
1152
1153    tor = NULL;
1154    while(( tor = tr_torrentNext( mgr->session, tor )))
1155    {
1156        Torrent * t = tor->torrentPeers;
1157        const int n = t->requestCount;
1158        if( n > 0 )
1159        {
1160            int keepCount = 0;
1161            int cancelCount = 0;
1162            struct block_request * cancel = tr_new( struct block_request, n );
1163            const struct block_request * it;
1164            const struct block_request * end;
1165
1166            for( it=t->requests, end=it+n; it!=end; ++it )
1167            {
1168                if( ( it->sentAt <= too_old ) && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1169                    cancel[cancelCount++] = *it;
1170                else
1171                {
1172                    if( it != &t->requests[keepCount] )
1173                        t->requests[keepCount] = *it;
1174                    keepCount++;
1175                }
1176            }
1177
1178            /* prune out the ones we aren't keeping */
1179            t->requestCount = keepCount;
1180
1181            /* send cancel messages for all the "cancel" ones */
1182            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1183                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1184                    tr_historyAdd( it->peer->cancelsSentToPeer, now_msec, 1 );
1185                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1186                    decrementPendingReqCount( it );
1187                }
1188            }
1189
1190            /* decrement the pending request counts for the timed-out blocks */
1191            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1192                pieceListRemoveRequest( t, it->block );
1193
1194            /* cleanup loop */
1195            tr_free( cancel );
1196        }
1197    }
1198
1199    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1200    managerUnlock( mgr );
1201}
1202
1203static void
1204addStrike( Torrent * t, tr_peer * peer )
1205{
1206    tordbg( t, "increasing peer %s strike count to %d",
1207            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1208
1209    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1210    {
1211        struct peer_atom * atom = peer->atom;
1212        atom->myflags |= MYFLAG_BANNED;
1213        peer->doPurge = 1;
1214        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1215    }
1216}
1217
1218static void
1219gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1220{
1221    tr_torrent *   tor = t->tor;
1222    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1223
1224    tor->corruptCur += byteCount;
1225    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1226
1227    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1228}
1229
1230static void
1231peerSuggestedPiece( Torrent            * t UNUSED,
1232                    tr_peer            * peer UNUSED,
1233                    tr_piece_index_t     pieceIndex UNUSED,
1234                    int                  isFastAllowed UNUSED )
1235{
1236#if 0
1237    assert( t );
1238    assert( peer );
1239    assert( peer->msgs );
1240
1241    /* is this a valid piece? */
1242    if(  pieceIndex >= t->tor->info.pieceCount )
1243        return;
1244
1245    /* don't ask for it if we've already got it */
1246    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1247        return;
1248
1249    /* don't ask for it if they don't have it */
1250    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1251        return;
1252
1253    /* don't ask for it if we're choked and it's not fast */
1254    if( !isFastAllowed && peer->clientIsChoked )
1255        return;
1256
1257    /* request the blocks that we don't have in this piece */
1258    {
1259        tr_block_index_t block;
1260        const tr_torrent * tor = t->tor;
1261        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1262        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1263
1264        for( block=start; block<end; ++block )
1265        {
1266            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1267            {
1268                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1269                const uint32_t length = tr_torBlockCountBytes( tor, block );
1270                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1271                incrementPieceRequests( t, pieceIndex );
1272            }
1273        }
1274    }
1275#endif
1276}
1277
1278static void
1279removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1280{
1281    requestListRemove( t, block, peer );
1282    pieceListRemoveRequest( t, block );
1283}
1284
1285/* peer choked us, or maybe it disconnected.
1286   either way we need to remove all its requests */
1287static void
1288peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1289{
1290    int i, n;
1291    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1292
1293    for( i=n=0; i<t->requestCount; ++i )
1294        if( peer == t->requests[i].peer )
1295            blocks[n++] = t->requests[i].block;
1296
1297    for( i=0; i<n; ++i )
1298        removeRequestFromTables( t, blocks[i], peer );
1299
1300    tr_free( blocks );
1301}
1302
1303static void
1304peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1305{
1306    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1307    Torrent * t = vt;
1308    const tr_peer_event * e = vevent;
1309
1310    torrentLock( t );
1311
1312    switch( e->eventType )
1313    {
1314        case TR_PEER_PEER_GOT_DATA:
1315        {
1316            const time_t now = tr_time( );
1317            tr_torrent * tor = t->tor;
1318
1319            tr_torrentSetActivityDate( tor, now );
1320
1321            if( e->wasPieceData ) {
1322                tor->uploadedCur += e->length;
1323                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1324                tr_torrentSetDirty( tor );
1325            }
1326
1327            /* update the stats */
1328            if( e->wasPieceData )
1329                tr_statsAddUploaded( tor->session, e->length );
1330
1331            /* update our atom */
1332            if( peer && e->wasPieceData )
1333                peer->atom->piece_data_time = now;
1334
1335            break;
1336        }
1337
1338        case TR_PEER_CLIENT_GOT_REJ:
1339            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1340            break;
1341
1342        case TR_PEER_CLIENT_GOT_CHOKE:
1343            peerDeclinedAllRequests( t, peer );
1344            break;
1345
1346        case TR_PEER_CLIENT_GOT_PORT:
1347            if( peer )
1348                peer->atom->port = e->port;
1349            break;
1350
1351        case TR_PEER_CLIENT_GOT_SUGGEST:
1352            if( peer )
1353                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1354            break;
1355
1356        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1357            if( peer )
1358                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1359            break;
1360
1361        case TR_PEER_CLIENT_GOT_DATA:
1362        {
1363            const time_t now = tr_time( );
1364            tr_torrent * tor = t->tor;
1365
1366            tr_torrentSetActivityDate( tor, now );
1367
1368            if( e->wasPieceData ) {
1369                tor->downloadedCur += e->length;
1370                tr_torrentSetDirty( tor );
1371            }
1372
1373            /* update the stats */
1374            if( e->wasPieceData )
1375                tr_statsAddDownloaded( tor->session, e->length );
1376
1377            /* update our atom */
1378            if( peer && e->wasPieceData )
1379                peer->atom->piece_data_time = now;
1380
1381            break;
1382        }
1383
1384        case TR_PEER_PEER_PROGRESS:
1385        {
1386            if( peer )
1387            {
1388                struct peer_atom * atom = peer->atom;
1389                if( e->progress >= 1.0 ) {
1390                    tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
1391                    atomSetSeed( atom );
1392                }
1393            }
1394            break;
1395        }
1396
1397        case TR_PEER_CLIENT_GOT_BLOCK:
1398        {
1399            tr_torrent * tor = t->tor;
1400            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1401
1402            requestListRemove( t, block, peer );
1403            pieceListRemoveRequest( t, block );
1404
1405            if( peer != NULL )
1406                tr_historyAdd( peer->blocksSentToClient, tr_date( ), 1 );
1407
1408            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1409            {
1410                /* we already have this block... */
1411                const uint32_t n = tr_torBlockCountBytes( tor, block );
1412                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1413                tordbg( t, "we have this block already..." );
1414            }
1415            else
1416            {
1417                tr_cpBlockAdd( &tor->completion, block );
1418                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1419                tr_torrentSetDirty( tor );
1420
1421                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1422                {
1423                    const tr_piece_index_t p = e->pieceIndex;
1424                    const tr_bool ok = tr_ioTestPiece( tor, p );
1425
1426                    if( !ok )
1427                    {
1428                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1429                                   (unsigned long)p );
1430                    }
1431
1432                    tr_torrentSetHasPiece( tor, p, ok );
1433                    tr_torrentSetPieceChecked( tor, p, TRUE );
1434                    tr_peerMgrSetBlame( tor, p, ok );
1435
1436                    if( !ok )
1437                    {
1438                        gotBadPiece( t, p );
1439                    }
1440                    else
1441                    {
1442                        int i;
1443                        int peerCount;
1444                        tr_peer ** peers;
1445                        tr_file_index_t fileIndex;
1446
1447                        /* only add this to downloadedCur if we got it from a peer --
1448                         * webseeds shouldn't count against our ratio.  As one tracker
1449                         * admin put it, "Those pieces are downloaded directly from the
1450                         * content distributor, not the peers, it is the tracker's job
1451                         * to manage the swarms, not the web server and does not fit
1452                         * into the jurisdiction of the tracker." */
1453                        if( peer != NULL ) {
1454                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1455                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1456                        }
1457
1458                        peerCount = tr_ptrArraySize( &t->peers );
1459                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1460                        for( i=0; i<peerCount; ++i )
1461                            tr_peerMsgsHave( peers[i]->msgs, p );
1462
1463                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1464                            const tr_file * file = &tor->info.files[fileIndex];
1465                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1466                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1467                                    tr_torrentFileCompleted( tor, fileIndex );
1468                        }
1469
1470                        pieceListRemovePiece( t, p );
1471                    }
1472                }
1473
1474                t->needsCompletenessCheck = TRUE;
1475            }
1476            break;
1477        }
1478
1479        case TR_PEER_ERROR:
1480            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1481            {
1482                /* some protocol error from the peer */
1483                peer->doPurge = 1;
1484                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1485                        tr_atomAddrStr( peer->atom ) );
1486            }
1487            else
1488            {
1489                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1490            }
1491            break;
1492
1493        default:
1494            assert( 0 );
1495    }
1496
1497    torrentUnlock( t );
1498}
1499
1500static int
1501getDefaultShelfLife( uint8_t from )
1502{
1503    /* in general, peers obtained from firsthand contact
1504     * are better than those from secondhand, etc etc */
1505    switch( from )
1506    {
1507        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1508        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1509        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1510        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1511        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1512        case TR_PEER_FROM_RESUME   : return 60 * 60;
1513        case TR_PEER_FROM_LPD      : return 10 * 60;
1514        default                    : return 60 * 60;
1515    }
1516}
1517
1518static void
1519ensureAtomExists( Torrent           * t,
1520                  const tr_address  * addr,
1521                  const tr_port       port,
1522                  const uint8_t       flags,
1523                  const int8_t        seedProbability,
1524                  const uint8_t       from )
1525{
1526    struct peer_atom * a;
1527
1528    assert( tr_isAddress( addr ) );
1529    assert( from < TR_PEER_FROM__MAX );
1530
1531    a = getExistingAtom( t, addr );
1532
1533    if( a == NULL )
1534    {
1535        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1536        a = tr_new0( struct peer_atom, 1 );
1537        a->addr = *addr;
1538        a->port = port;
1539        a->flags = flags;
1540        a->from = from;
1541        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1542        atomSetSeedProbability( a, seedProbability );
1543        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1544
1545        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1546    }
1547    else if( a->seedProbability == -1 )
1548    {
1549        atomSetSeedProbability( a, seedProbability );
1550    }
1551}
1552
1553static int
1554getMaxPeerCount( const tr_torrent * tor )
1555{
1556    return tor->maxConnectedPeers;
1557}
1558
1559static int
1560getPeerCount( const Torrent * t )
1561{
1562    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1563}
1564
1565/* FIXME: this is kind of a mess. */
1566static tr_bool
1567myHandshakeDoneCB( tr_handshake  * handshake,
1568                   tr_peerIo     * io,
1569                   tr_bool         readAnythingFromPeer,
1570                   tr_bool         isConnected,
1571                   const uint8_t * peer_id,
1572                   void          * vmanager )
1573{
1574    tr_bool            ok = isConnected;
1575    tr_bool            success = FALSE;
1576    tr_port            port;
1577    const tr_address * addr;
1578    tr_peerMgr       * manager = vmanager;
1579    Torrent          * t;
1580    tr_handshake     * ours;
1581
1582    assert( io );
1583    assert( tr_isBool( ok ) );
1584
1585    t = tr_peerIoHasTorrentHash( io )
1586        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1587        : NULL;
1588
1589    if( tr_peerIoIsIncoming ( io ) )
1590        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1591                                        handshake, handshakeCompare );
1592    else if( t )
1593        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1594                                        handshake, handshakeCompare );
1595    else
1596        ours = handshake;
1597
1598    assert( ours );
1599    assert( ours == handshake );
1600
1601    if( t )
1602        torrentLock( t );
1603
1604    addr = tr_peerIoGetAddress( io, &port );
1605
1606    if( !ok || !t || !t->isRunning )
1607    {
1608        if( t )
1609        {
1610            struct peer_atom * atom = getExistingAtom( t, addr );
1611            if( atom )
1612            {
1613                ++atom->numFails;
1614
1615                if( !readAnythingFromPeer )
1616                {
1617                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1618                    atom->myflags |= MYFLAG_UNREACHABLE;
1619                }
1620            }
1621        }
1622    }
1623    else /* looking good */
1624    {
1625        struct peer_atom * atom;
1626
1627        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1628        atom = getExistingAtom( t, addr );
1629        atom->time = tr_time( );
1630        atom->piece_data_time = 0;
1631        atom->lastConnectionAt = tr_time( );
1632        atom->myflags &= ~MYFLAG_UNREACHABLE;
1633
1634        if( atom->myflags & MYFLAG_BANNED )
1635        {
1636            tordbg( t, "banned peer %s tried to reconnect",
1637                    tr_atomAddrStr( atom ) );
1638        }
1639        else if( tr_peerIoIsIncoming( io )
1640               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1641
1642        {
1643        }
1644        else
1645        {
1646            tr_peer * peer = atom->peer;
1647
1648            if( peer ) /* we already have this peer */
1649            {
1650            }
1651            else
1652            {
1653                peer = getPeer( t, atom );
1654                tr_free( peer->client );
1655
1656                if( !peer_id )
1657                    peer->client = NULL;
1658                else {
1659                    char client[128];
1660                    tr_clientForId( client, sizeof( client ), peer_id );
1661                    peer->client = tr_strdup( client );
1662                }
1663
1664                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1665                                                                balanced by our unref in peerDestructor()  */
1666                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1667                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1668
1669                success = TRUE;
1670            }
1671        }
1672    }
1673
1674    if( t )
1675        torrentUnlock( t );
1676
1677    return success;
1678}
1679
1680void
1681tr_peerMgrAddIncoming( tr_peerMgr * manager,
1682                       tr_address * addr,
1683                       tr_port      port,
1684                       int          socket )
1685{
1686    tr_session * session;
1687
1688    managerLock( manager );
1689
1690    assert( tr_isSession( manager->session ) );
1691    session = manager->session;
1692
1693    if( tr_sessionIsAddressBlocked( session, addr ) )
1694    {
1695        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1696        tr_netClose( session, socket );
1697    }
1698    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1699    {
1700        tr_netClose( session, socket );
1701    }
1702    else /* we don't have a connection to them yet... */
1703    {
1704        tr_peerIo *    io;
1705        tr_handshake * handshake;
1706
1707        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1708
1709        handshake = tr_handshakeNew( io,
1710                                     session->encryptionMode,
1711                                     myHandshakeDoneCB,
1712                                     manager );
1713
1714        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1715
1716        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1717                                 handshakeCompare );
1718    }
1719
1720    managerUnlock( manager );
1721}
1722
1723static tr_bool
1724tr_isPex( const tr_pex * pex )
1725{
1726    return pex && tr_isAddress( &pex->addr );
1727}
1728
1729void
1730tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
1731                  const tr_pex * pex, int8_t seedProbability )
1732{
1733    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1734    {
1735        Torrent * t = tor->torrentPeers;
1736        managerLock( t->manager );
1737
1738        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1739            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1740                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
1741
1742        managerUnlock( t->manager );
1743    }
1744}
1745
1746void
1747tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
1748{
1749    Torrent * t = tor->torrentPeers;
1750    const int n = tr_ptrArraySize( &t->pool );
1751    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
1752    struct peer_atom ** end = it + n;
1753
1754    while( it != end )
1755        atomSetSeed( *it++ );
1756}
1757
1758tr_pex *
1759tr_peerMgrCompactToPex( const void *    compact,
1760                        size_t          compactLen,
1761                        const uint8_t * added_f,
1762                        size_t          added_f_len,
1763                        size_t *        pexCount )
1764{
1765    size_t          i;
1766    size_t          n = compactLen / 6;
1767    const uint8_t * walk = compact;
1768    tr_pex *        pex = tr_new0( tr_pex, n );
1769
1770    for( i = 0; i < n; ++i )
1771    {
1772        pex[i].addr.type = TR_AF_INET;
1773        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1774        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1775        if( added_f && ( n == added_f_len ) )
1776            pex[i].flags = added_f[i];
1777    }
1778
1779    *pexCount = n;
1780    return pex;
1781}
1782
1783tr_pex *
1784tr_peerMgrCompact6ToPex( const void    * compact,
1785                         size_t          compactLen,
1786                         const uint8_t * added_f,
1787                         size_t          added_f_len,
1788                         size_t        * pexCount )
1789{
1790    size_t          i;
1791    size_t          n = compactLen / 18;
1792    const uint8_t * walk = compact;
1793    tr_pex *        pex = tr_new0( tr_pex, n );
1794
1795    for( i = 0; i < n; ++i )
1796    {
1797        pex[i].addr.type = TR_AF_INET6;
1798        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1799        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1800        if( added_f && ( n == added_f_len ) )
1801            pex[i].flags = added_f[i];
1802    }
1803
1804    *pexCount = n;
1805    return pex;
1806}
1807
1808tr_pex *
1809tr_peerMgrArrayToPex( const void * array,
1810                      size_t       arrayLen,
1811                      size_t      * pexCount )
1812{
1813    size_t          i;
1814    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1815    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1816    const uint8_t * walk = array;
1817    tr_pex        * pex = tr_new0( tr_pex, n );
1818
1819    for( i = 0 ; i < n ; i++ ) {
1820        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1821        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1822        pex[i].flags = 0x00;
1823        walk += sizeof( tr_address ) + 2;
1824    }
1825
1826    *pexCount = n;
1827    return pex;
1828}
1829
1830/**
1831***
1832**/
1833
1834void
1835tr_peerMgrSetBlame( tr_torrent     * tor,
1836                    tr_piece_index_t pieceIndex,
1837                    int              success )
1838{
1839    if( !success )
1840    {
1841        int        peerCount, i;
1842        Torrent *  t = tor->torrentPeers;
1843        tr_peer ** peers;
1844
1845        assert( torrentIsLocked( t ) );
1846
1847        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1848        for( i = 0; i < peerCount; ++i )
1849        {
1850            tr_peer * peer = peers[i];
1851            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1852            {
1853                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1854                        tr_atomAddrStr( peer->atom ),
1855                        pieceIndex, (int)peer->strikes + 1 );
1856                addStrike( t, peer );
1857            }
1858        }
1859    }
1860}
1861
1862int
1863tr_pexCompare( const void * va, const void * vb )
1864{
1865    const tr_pex * a = va;
1866    const tr_pex * b = vb;
1867    int i;
1868
1869    assert( tr_isPex( a ) );
1870    assert( tr_isPex( b ) );
1871
1872    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1873        return i;
1874
1875    if( a->port != b->port )
1876        return a->port < b->port ? -1 : 1;
1877
1878    return 0;
1879}
1880
1881#if 0
1882static int
1883peerPrefersCrypto( const tr_peer * peer )
1884{
1885    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1886        return TRUE;
1887
1888    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1889        return FALSE;
1890
1891    return tr_peerIoIsEncrypted( peer->io );
1892}
1893#endif
1894
1895/* better goes first */
1896static int
1897compareAtomsByUsefulness( const void * va, const void *vb )
1898{
1899    const struct peer_atom * a = * (const struct peer_atom**) va;
1900    const struct peer_atom * b = * (const struct peer_atom**) vb;
1901
1902    assert( tr_isAtom( a ) );
1903    assert( tr_isAtom( b ) );
1904
1905    if( a->piece_data_time != b->piece_data_time )
1906        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1907    if( a->from != b->from )
1908        return a->from < b->from ? -1 : 1;
1909    if( a->numFails != b->numFails )
1910        return a->numFails < b->numFails ? -1 : 1;
1911
1912    return 0;
1913}
1914
1915int
1916tr_peerMgrGetPeers( tr_torrent   * tor,
1917                    tr_pex      ** setme_pex,
1918                    uint8_t        af,
1919                    uint8_t        list_mode,
1920                    int            maxCount )
1921{
1922    int i;
1923    int n;
1924    int count = 0;
1925    int atomCount = 0;
1926    const Torrent * t = tor->torrentPeers;
1927    struct peer_atom ** atoms = NULL;
1928    tr_pex * pex;
1929    tr_pex * walk;
1930
1931    assert( tr_isTorrent( tor ) );
1932    assert( setme_pex != NULL );
1933    assert( af==TR_AF_INET || af==TR_AF_INET6 );
1934    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1935
1936    managerLock( t->manager );
1937
1938    /**
1939    ***  build a list of atoms
1940    **/
1941
1942    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1943    {
1944        int i;
1945        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1946        atomCount = tr_ptrArraySize( &t->peers );
1947        atoms = tr_new( struct peer_atom *, atomCount );
1948        for( i=0; i<atomCount; ++i )
1949            atoms[i] = peers[i]->atom;
1950    }
1951    else /* TR_PEERS_ALL */
1952    {
1953        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1954        atomCount = tr_ptrArraySize( &t->pool );
1955        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1956    }
1957
1958    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1959
1960    /**
1961    ***  add the first N of them into our return list
1962    **/
1963
1964    n = MIN( atomCount, maxCount );
1965    pex = walk = tr_new0( tr_pex, n );
1966
1967    for( i=0; i<atomCount && count<n; ++i )
1968    {
1969        const struct peer_atom * atom = atoms[i];
1970        if( atom->addr.type == af )
1971        {
1972            assert( tr_isAddress( &atom->addr ) );
1973            walk->addr = atom->addr;
1974            walk->port = atom->port;
1975            walk->flags = atom->flags;
1976            ++count;
1977            ++walk;
1978        }
1979    }
1980
1981    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1982
1983    assert( ( walk - pex ) == count );
1984    *setme_pex = pex;
1985
1986    /* cleanup */
1987    tr_free( atoms );
1988    managerUnlock( t->manager );
1989    return count;
1990}
1991
1992static void atomPulse      ( int, short, void * );
1993static void bandwidthPulse ( int, short, void * );
1994static void rechokePulse   ( int, short, void * );
1995static void reconnectPulse ( int, short, void * );
1996
1997static struct event *
1998createTimer( int msec, void (*callback)(int, short, void *), void * cbdata )
1999{
2000    struct event * timer = tr_new0( struct event, 1 );
2001    evtimer_set( timer, callback, cbdata );
2002    tr_timerAddMsec( timer, msec );
2003    return timer;
2004}
2005
2006static void
2007ensureMgrTimersExist( struct tr_peerMgr * m )
2008{
2009    if( m->atomTimer == NULL )
2010        m->atomTimer = createTimer( ATOM_PERIOD_MSEC, atomPulse, m );
2011
2012    if( m->bandwidthTimer == NULL )
2013        m->bandwidthTimer = createTimer( BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2014
2015    if( m->rechokeTimer == NULL )
2016        m->rechokeTimer = createTimer( RECHOKE_PERIOD_MSEC, rechokePulse, m );
2017
2018   if( m->refillUpkeepTimer == NULL )
2019        m->refillUpkeepTimer = createTimer( REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2020}
2021
2022void
2023tr_peerMgrStartTorrent( tr_torrent * tor )
2024{
2025    Torrent * t = tor->torrentPeers;
2026
2027    assert( t != NULL );
2028    managerLock( t->manager );
2029    ensureMgrTimersExist( t->manager );
2030
2031    t->isRunning = TRUE;
2032
2033    rechokePulse( 0, 0, t->manager );
2034    managerUnlock( t->manager );
2035}
2036
2037static void
2038stopTorrent( Torrent * t )
2039{
2040    int i, n;
2041
2042    assert( torrentIsLocked( t ) );
2043
2044    t->isRunning = FALSE;
2045
2046    /* disconnect the peers. */
2047    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
2048        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
2049    tr_ptrArrayClear( &t->peers );
2050
2051    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
2052     * which removes the handshake from t->outgoingHandshakes... */
2053    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2054        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2055}
2056
2057void
2058tr_peerMgrStopTorrent( tr_torrent * tor )
2059{
2060    Torrent * t = tor->torrentPeers;
2061
2062    managerLock( t->manager );
2063
2064    stopTorrent( t );
2065
2066    managerUnlock( t->manager );
2067}
2068
2069void
2070tr_peerMgrAddTorrent( tr_peerMgr * manager,
2071                      tr_torrent * tor )
2072{
2073    managerLock( manager );
2074
2075    assert( tor );
2076    assert( tor->torrentPeers == NULL );
2077
2078    tor->torrentPeers = torrentConstructor( manager, tor );
2079
2080    managerUnlock( manager );
2081}
2082
2083void
2084tr_peerMgrRemoveTorrent( tr_torrent * tor )
2085{
2086    tr_torrentLock( tor );
2087
2088    stopTorrent( tor->torrentPeers );
2089    torrentDestructor( tor->torrentPeers );
2090
2091    tr_torrentUnlock( tor );
2092}
2093
2094void
2095tr_peerMgrTorrentAvailability( const tr_torrent * tor,
2096                               int8_t           * tab,
2097                               unsigned int       tabCount )
2098{
2099    tr_piece_index_t   i;
2100    const Torrent *    t;
2101    float              interval;
2102    tr_bool            isSeed;
2103    int                peerCount;
2104    const tr_peer **   peers;
2105    tr_torrentLock( tor );
2106
2107    t = tor->torrentPeers;
2108    tor = t->tor;
2109    interval = tor->info.pieceCount / (float)tabCount;
2110    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
2111    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2112    peerCount = tr_ptrArraySize( &t->peers );
2113
2114    memset( tab, 0, tabCount );
2115
2116    for( i = 0; tor && i < tabCount; ++i )
2117    {
2118        const int piece = i * interval;
2119
2120        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2121            tab[i] = -1;
2122        else if( peerCount ) {
2123            int j;
2124            for( j = 0; j < peerCount; ++j )
2125                if( tr_bitsetHas( &peers[j]->have, i ) )
2126                    ++tab[i];
2127        }
2128    }
2129
2130    tr_torrentUnlock( tor );
2131}
2132
2133/* Returns the pieces that are available from peers */
2134tr_bitfield*
2135tr_peerMgrGetAvailable( const tr_torrent * tor )
2136{
2137    int i;
2138    int peerCount;
2139    Torrent * t = tor->torrentPeers;
2140    const tr_peer ** peers;
2141    tr_bitfield * pieces;
2142    managerLock( t->manager );
2143
2144    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2145    peerCount = tr_ptrArraySize( &t->peers );
2146    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2147    for( i=0; i<peerCount; ++i )
2148        tr_bitsetOr( pieces, &peers[i]->have );
2149
2150    managerUnlock( t->manager );
2151    return pieces;
2152}
2153
2154void
2155tr_peerMgrTorrentStats( tr_torrent       * tor,
2156                        int              * setmePeersKnown,
2157                        int              * setmePeersConnected,
2158                        int              * setmeSeedsConnected,
2159                        int              * setmeWebseedsSendingToUs,
2160                        int              * setmePeersSendingToUs,
2161                        int              * setmePeersGettingFromUs,
2162                        int              * setmePeersFrom )
2163{
2164    int i, size;
2165    const Torrent * t = tor->torrentPeers;
2166    const tr_peer ** peers;
2167    const tr_webseed ** webseeds;
2168
2169    managerLock( t->manager );
2170
2171    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2172    size = tr_ptrArraySize( &t->peers );
2173
2174    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2175    *setmePeersConnected       = 0;
2176    *setmeSeedsConnected       = 0;
2177    *setmePeersGettingFromUs   = 0;
2178    *setmePeersSendingToUs     = 0;
2179    *setmeWebseedsSendingToUs  = 0;
2180
2181    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2182        setmePeersFrom[i] = 0;
2183
2184    for( i=0; i<size; ++i )
2185    {
2186        const tr_peer * peer = peers[i];
2187        const struct peer_atom * atom = peer->atom;
2188
2189        if( peer->io == NULL ) /* not connected */
2190            continue;
2191
2192        ++*setmePeersConnected;
2193
2194        ++setmePeersFrom[atom->from];
2195
2196        if( clientIsDownloadingFrom( tor, peer ) )
2197            ++*setmePeersSendingToUs;
2198
2199        if( clientIsUploadingTo( peer ) )
2200            ++*setmePeersGettingFromUs;
2201
2202        if( atomIsSeed( atom ) )
2203            ++*setmeSeedsConnected;
2204    }
2205
2206    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2207    size = tr_ptrArraySize( &t->webseeds );
2208    for( i=0; i<size; ++i )
2209        if( tr_webseedIsActive( webseeds[i] ) )
2210            ++*setmeWebseedsSendingToUs;
2211
2212    managerUnlock( t->manager );
2213}
2214
2215float
2216tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
2217{
2218    int i;
2219    float tmp;
2220    float ret = 0;
2221
2222    const Torrent * t = tor->torrentPeers;
2223    const int n = tr_ptrArraySize( &t->webseeds );
2224    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2225
2226    for( i=0; i<n; ++i )
2227        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
2228            ret += tmp;
2229
2230    return ret;
2231}
2232
2233
2234float*
2235tr_peerMgrWebSpeeds( const tr_torrent * tor )
2236{
2237    const Torrent * t = tor->torrentPeers;
2238    const tr_webseed ** webseeds;
2239    int i;
2240    int webseedCount;
2241    float * ret;
2242    uint64_t now;
2243
2244    assert( t->manager );
2245    managerLock( t->manager );
2246
2247    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2248    webseedCount = tr_ptrArraySize( &t->webseeds );
2249    assert( webseedCount == tor->info.webseedCount );
2250    ret = tr_new0( float, webseedCount );
2251    now = tr_date( );
2252
2253    for( i=0; i<webseedCount; ++i )
2254        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
2255            ret[i] = -1.0;
2256
2257    managerUnlock( t->manager );
2258    return ret;
2259}
2260
2261double
2262tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
2263{
2264    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
2265}
2266
2267
2268struct tr_peer_stat *
2269tr_peerMgrPeerStats( const tr_torrent    * tor,
2270                     int                 * setmeCount )
2271{
2272    int i, size;
2273    const Torrent * t = tor->torrentPeers;
2274    const tr_peer ** peers;
2275    tr_peer_stat * ret;
2276    uint64_t now;
2277
2278    assert( t->manager );
2279    managerLock( t->manager );
2280
2281    size = tr_ptrArraySize( &t->peers );
2282    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2283    ret = tr_new0( tr_peer_stat, size );
2284    now = tr_date( );
2285
2286    for( i=0; i<size; ++i )
2287    {
2288        char *                   pch;
2289        const tr_peer *          peer = peers[i];
2290        const struct peer_atom * atom = peer->atom;
2291        tr_peer_stat *           stat = ret + i;
2292
2293        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2294        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2295                   sizeof( stat->client ) );
2296        stat->port                = ntohs( peer->atom->port );
2297        stat->from                = atom->from;
2298        stat->progress            = peer->progress;
2299        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2300        stat->rateToPeer          = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
2301        stat->rateToClient        = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
2302        stat->peerIsChoked        = peer->peerIsChoked;
2303        stat->peerIsInterested    = peer->peerIsInterested;
2304        stat->clientIsChoked      = peer->clientIsChoked;
2305        stat->clientIsInterested  = peer->clientIsInterested;
2306        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2307        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2308        stat->isUploadingTo       = clientIsUploadingTo( peer );
2309        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2310
2311        stat->blocksToPeer        = tr_historyGet( peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC*1000 );
2312        stat->blocksToClient      = tr_historyGet( peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC*1000 );
2313        stat->cancelsToPeer       = tr_historyGet( peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC*1000 );
2314        stat->cancelsToClient     = tr_historyGet( peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC*1000 );
2315
2316        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2317        stat->pendingReqsToClient = peer->pendingReqsToClient;
2318
2319        pch = stat->flagStr;
2320        if( t->optimistic == peer ) *pch++ = 'O';
2321        if( stat->isDownloadingFrom ) *pch++ = 'D';
2322        else if( stat->clientIsInterested ) *pch++ = 'd';
2323        if( stat->isUploadingTo ) *pch++ = 'U';
2324        else if( stat->peerIsInterested ) *pch++ = 'u';
2325        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2326        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2327        if( stat->isEncrypted ) *pch++ = 'E';
2328        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2329        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2330        if( stat->isIncoming ) *pch++ = 'I';
2331        *pch = '\0';
2332    }
2333
2334    *setmeCount = size;
2335
2336    managerUnlock( t->manager );
2337    return ret;
2338}
2339
2340/**
2341***
2342**/
2343
2344/* do we still want this piece and does the peer have it? */
2345static tr_bool
2346isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2347{
2348    return ( !tor->info.pieces[index].dnd ) /* we want it */
2349        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2350        && ( tr_bitsetHas( &peer->have, index ) ); /* peer has it */
2351}
2352
2353/* does this peer have any pieces that we want? */
2354static tr_bool
2355isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2356{
2357    tr_piece_index_t i, n;
2358
2359    if ( tr_torrentIsSeed( tor ) )
2360        return FALSE;
2361
2362    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2363        return FALSE;
2364
2365    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2366        if( isPieceInteresting( tor, peer, i ) )
2367            return TRUE;
2368
2369    return FALSE;
2370}
2371
2372/* determines who we send "interested" messages to */
2373static void
2374rechokeDownloads( Torrent * t )
2375{
2376    int i;
2377    const uint64_t now = tr_date( );
2378    const int msec = 60 * 1000;
2379    const int MIN_INTERESTING_PEERS = 5;
2380    const int peerCount = tr_ptrArraySize( &t->peers );
2381    int maxPeers;
2382
2383    int badCount         = 0;
2384    int goodCount        = 0;
2385    int untestedCount    = 0;
2386    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2387    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2388    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2389
2390    /* decide how many peers to be interested in */
2391    {
2392        int blocks = 0;
2393        int cancels = 0;
2394
2395        /* Count up how many blocks & cancels each peer has.
2396         *
2397         * There are two situations where we send out cancels --
2398         *
2399         * 1. We've got unresponsive peers, which is handled by deciding
2400         *    -which- peers to be interested in.
2401         *
2402         * 2. We've hit our bandwidth cap, which is handled by deciding
2403         *    -how many- peers to be interested in.
2404         *
2405         * We're working on 2. here, so we need to ignore unresponsive
2406         * peers in our calculations lest they confuse Transmission into
2407         * thinking it's hit its bandwidth cap.
2408         */
2409        for( i=0; i<peerCount; ++i )
2410        {
2411            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2412            const int b = tr_historyGet( peer->blocksSentToClient, now, msec );
2413            const int c = tr_historyGet( peer->cancelsSentToPeer, now, msec );
2414
2415            if( b == 0 ) /* ignore unresponsive peers, as described above */
2416                continue;
2417
2418            blocks += b;
2419            cancels += c;
2420        }
2421
2422        if( !t->interestedCount )
2423        {
2424            /* this is the torrent's first time to call this function...
2425             * start off optimistically by allowing interest in many peers */
2426            maxPeers = t->tor->maxConnectedPeers;
2427        }
2428        else if( !blocks )
2429        {
2430            /* we've gotten cancels but zero blocks...
2431             * something is seriously wrong.  throttle back sharply */
2432            maxPeers = t->interestedCount * 0.5;
2433        }
2434        else
2435        {
2436            const double cancelRate = cancels / (double)(cancels + blocks);
2437                 if( cancelRate >= 0.20 ) maxPeers = t->interestedCount * 0.7;
2438            else if( cancelRate >= 0.10 ) maxPeers = t->interestedCount * 0.8;
2439            else if( cancelRate >= 0.05 ) maxPeers = t->interestedCount * 0.9;
2440            else if( cancelRate >= 0.01 ) maxPeers = t->interestedCount;
2441            else                          maxPeers = t->interestedCount + 1;
2442
2443            /* if things are getting worse, don't add more peers */
2444            if( ( t->cancelRate > 0.01 ) && ( cancelRate > t->cancelRate ) )
2445                maxPeers = MIN( maxPeers, t->interestedCount );
2446
2447            t->cancelRate = cancelRate;
2448
2449            tordbg( t, "cancel rate is %.3f -- changing the "
2450                       "number of peers we're interested in from %d to %d",
2451                       cancelRate, t->interestedCount, maxPeers );
2452        }
2453    }
2454
2455    /* don't let the previous paragraph's number tweaking go too far... */
2456    if( maxPeers < MIN_INTERESTING_PEERS )
2457        maxPeers = MIN_INTERESTING_PEERS;
2458    if( maxPeers > t->tor->maxConnectedPeers )
2459        maxPeers = t->tor->maxConnectedPeers;
2460
2461    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2462     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2463     * That's the order in which we'll choose who to show interest in */
2464    for( i=0; i<peerCount; ++i )
2465    {
2466        tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2467
2468        if( !isPeerInteresting( t->tor, peer ) )
2469        {
2470            tr_peerMsgsSetInterested( peer->msgs, FALSE );
2471        }
2472        else
2473        {
2474            const int blocks = tr_historyGet( peer->blocksSentToClient, now, msec );
2475            const int cancels = tr_historyGet( peer->cancelsSentToPeer, now, msec );
2476
2477            if( !blocks && !cancels )
2478                untested[untestedCount++] = peer;
2479            else if( !cancels )
2480                good[goodCount++] = peer;
2481            else if( !blocks )
2482                bad[badCount++] = peer;
2483            else if( ( cancels * 10 ) < blocks )
2484                good[goodCount++] = peer;
2485            else
2486                bad[badCount++] = peer;
2487        }
2488    }
2489
2490    t->interestedCount = 0;
2491
2492    /* We've decided (1) how many peers to be interested in,
2493     * and (2) which peers are the best candidates,
2494     * Now it's time to update our `interest' flags. */
2495    for( i=0; i<goodCount; ++i ) {
2496        const tr_bool b = t->interestedCount < maxPeers;
2497        tr_peerMsgsSetInterested( good[i]->msgs, b );
2498        if( b )
2499            ++t->interestedCount;
2500    }
2501    for( i=0; i<untestedCount; ++i ) {
2502        const tr_bool b = t->interestedCount < maxPeers;
2503        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2504        if( b )
2505            ++t->interestedCount;
2506    }
2507    for( i=0; i<badCount; ++i ) {
2508        const tr_bool b = t->interestedCount < maxPeers;
2509        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2510        if( b )
2511            ++t->interestedCount;
2512    }
2513
2514/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2515
2516    /* cleanup */
2517    tr_free( untested );
2518    tr_free( good );
2519    tr_free( bad );
2520}
2521
2522/**
2523***
2524**/
2525
2526struct ChokeData
2527{
2528    tr_bool         doUnchoke;
2529    tr_bool         isInterested;
2530    tr_bool         isChoked;
2531    int             rate;
2532    tr_peer *       peer;
2533};
2534
2535static int
2536compareChoke( const void * va,
2537              const void * vb )
2538{
2539    const struct ChokeData * a = va;
2540    const struct ChokeData * b = vb;
2541
2542    if( a->rate != b->rate ) /* prefer higher overall speeds */
2543        return a->rate > b->rate ? -1 : 1;
2544
2545    if( a->isChoked != b->isChoked ) /* prefer unchoked */
2546        return a->isChoked ? 1 : -1;
2547
2548    return 0;
2549}
2550
2551/* is this a new connection? */
2552static int
2553isNew( const tr_peer * peer )
2554{
2555    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2556}
2557
2558static void
2559rechokeUploads( Torrent * t, const uint64_t now )
2560{
2561    int i, size, unchokedInterested;
2562    const int peerCount = tr_ptrArraySize( &t->peers );
2563    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2564    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2565    const tr_session * session = t->manager->session;
2566    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2567
2568    assert( torrentIsLocked( t ) );
2569
2570    /* sort the peers by preference and rate */
2571    for( i = 0, size = 0; i < peerCount; ++i )
2572    {
2573        tr_peer * peer = peers[i];
2574        struct peer_atom * atom = peer->atom;
2575
2576        if( peer->progress >= 1.0 ) /* choke all seeds */
2577        {
2578            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2579        }
2580        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2581        {
2582            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2583        }
2584        else if( chokeAll ) /* choke everyone if we're not uploading */
2585        {
2586            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2587        }
2588        else
2589        {
2590            struct ChokeData * n = &choke[size++];
2591            n->peer         = peer;
2592            n->isInterested = peer->peerIsInterested;
2593            n->isChoked     = peer->peerIsChoked;
2594            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2595        }
2596    }
2597
2598    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2599
2600    /**
2601     * Reciprocation and number of uploads capping is managed by unchoking
2602     * the N peers which have the best upload rate and are interested.
2603     * This maximizes the client's download rate. These N peers are
2604     * referred to as downloaders, because they are interested in downloading
2605     * from the client.
2606     *
2607     * Peers which have a better upload rate (as compared to the downloaders)
2608     * but aren't interested get unchoked. If they become interested, the
2609     * downloader with the worst upload rate gets choked. If a client has
2610     * a complete file, it uses its upload rate rather than its download
2611     * rate to decide which peers to unchoke.
2612     */
2613    unchokedInterested = 0;
2614    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2615        choke[i].doUnchoke = 1;
2616        if( choke[i].isInterested )
2617            ++unchokedInterested;
2618    }
2619
2620    /* optimistic unchoke */
2621    if( i < size )
2622    {
2623        int n;
2624        struct ChokeData * c;
2625        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2626
2627        for( ; i<size; ++i )
2628        {
2629            if( choke[i].isInterested )
2630            {
2631                const tr_peer * peer = choke[i].peer;
2632                int x = 1, y;
2633                if( isNew( peer ) ) x *= 3;
2634                for( y=0; y<x; ++y )
2635                    tr_ptrArrayAppend( &randPool, &choke[i] );
2636            }
2637        }
2638
2639        if(( n = tr_ptrArraySize( &randPool )))
2640        {
2641            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2642            c->doUnchoke = 1;
2643            t->optimistic = c->peer;
2644        }
2645
2646        tr_ptrArrayDestruct( &randPool, NULL );
2647    }
2648
2649    for( i=0; i<size; ++i )
2650        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2651
2652    /* cleanup */
2653    tr_free( choke );
2654}
2655
2656static void
2657rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
2658{
2659    uint64_t now;
2660    tr_torrent * tor = NULL;
2661    tr_peerMgr * mgr = vmgr;
2662    managerLock( mgr );
2663
2664    now = tr_date( );
2665    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2666        if( tor->isRunning ) {
2667            rechokeUploads( tor->torrentPeers, now );
2668            if( !tr_torrentIsSeed( tor ) )
2669                rechokeDownloads( tor->torrentPeers );
2670        }
2671    }
2672
2673    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
2674    managerUnlock( mgr );
2675}
2676
2677/***
2678****
2679****  Life and Death
2680****
2681***/
2682
2683typedef enum
2684{
2685    TR_CAN_KEEP,
2686    TR_CAN_CLOSE,
2687    TR_MUST_CLOSE,
2688}
2689tr_close_type_t;
2690
2691static tr_close_type_t
2692shouldPeerBeClosed( const Torrent    * t,
2693                    const tr_peer    * peer,
2694                    int                peerCount,
2695                    const time_t       now )
2696{
2697    const tr_torrent *       tor = t->tor;
2698    const struct peer_atom * atom = peer->atom;
2699
2700    /* if it's marked for purging, close it */
2701    if( peer->doPurge )
2702    {
2703        tordbg( t, "purging peer %s because its doPurge flag is set",
2704                tr_atomAddrStr( atom ) );
2705        return TR_MUST_CLOSE;
2706    }
2707
2708    /* if we're seeding and the peer has everything we have,
2709     * and enough time has passed for a pex exchange, then disconnect */
2710    if( tr_torrentIsSeed( tor ) )
2711    {
2712        tr_bool peerHasEverything;
2713
2714        if( atom->seedProbability != -1 )
2715        {
2716            peerHasEverything = atomIsSeed( atom );
2717        }
2718        else
2719        {
2720            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2721            tr_bitsetDifference( tmp, &peer->have );
2722            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2723            tr_bitfieldFree( tmp );
2724        }
2725
2726        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2727        {
2728            tordbg( t, "purging peer %s because we're both seeds",
2729                    tr_atomAddrStr( atom ) );
2730            return TR_MUST_CLOSE;
2731        }
2732    }
2733
2734    /* disconnect if it's been too long since piece data has been transferred.
2735     * this is on a sliding scale based on number of available peers... */
2736    {
2737        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2738        /* if we have >= relaxIfFewerThan, strictness is 100%.
2739         * if we have zero connections, strictness is 0% */
2740        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2741                               ? 1.0
2742                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2743        const int lo = MIN_UPLOAD_IDLE_SECS;
2744        const int hi = MAX_UPLOAD_IDLE_SECS;
2745        const int limit = hi - ( ( hi - lo ) * strictness );
2746        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2747/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2748        if( idleTime > limit ) {
2749            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2750                       tr_atomAddrStr( atom ), idleTime );
2751            return TR_CAN_CLOSE;
2752        }
2753    }
2754
2755    return TR_CAN_KEEP;
2756}
2757
2758static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2759
2760static tr_peer **
2761getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2762{
2763    int i, peerCount, outsize;
2764    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2765    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2766
2767    assert( torrentIsLocked( t ) );
2768
2769    for( i = outsize = 0; i < peerCount; ++i )
2770        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2771            ret[outsize++] = peers[i];
2772
2773    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2774
2775    *setmeSize = outsize;
2776    return ret;
2777}
2778
2779static int
2780getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2781{
2782    int sec;
2783
2784    /* if we were recently connected to this peer and transferring piece
2785     * data, try to reconnect to them sooner rather that later -- we don't
2786     * want network troubles to get in the way of a good peer. */
2787    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2788        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2789
2790    /* don't allow reconnects more often than our minimum */
2791    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2792        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2793
2794    /* otherwise, the interval depends on how many times we've tried
2795     * and failed to connect to the peer */
2796    else switch( atom->numFails ) {
2797        case 0: sec = 0; break;
2798        case 1: sec = 5; break;
2799        case 2: sec = 2 * 60; break;
2800        case 3: sec = 15 * 60; break;
2801        case 4: sec = 30 * 60; break;
2802        case 5: sec = 60 * 60; break;
2803        default: sec = 120 * 60; break;
2804    }
2805
2806    /* penalize peers that were unreachable the last time we tried */
2807    if( atom->myflags & MYFLAG_UNREACHABLE )
2808        sec += sec;
2809
2810    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
2811    return sec;
2812}
2813
2814static void
2815closePeer( Torrent * t, tr_peer * peer )
2816{
2817    struct peer_atom * atom;
2818
2819    assert( t != NULL );
2820    assert( peer != NULL );
2821
2822    atom = peer->atom;
2823
2824    /* if we transferred piece data, then they might be good peers,
2825       so reset their `numFails' weight to zero.  otherwise we connected
2826       to them fruitlessly, so mark it as another fail */
2827    if( atom->piece_data_time ) {
2828        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
2829        atom->numFails = 0;
2830    } else {
2831        ++atom->numFails;
2832        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
2833    }
2834
2835    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2836    removePeer( t, peer );
2837}
2838
2839static void
2840closeBadPeers( Torrent * t )
2841{
2842    const time_t  now = tr_time( );
2843
2844    if( !t->isRunning )
2845    {
2846        removeAllPeers( t );
2847    }
2848    else
2849    {
2850        int i;
2851        int mustCloseCount;
2852        struct tr_peer ** mustClose;
2853
2854        /* disconnect the really bad peers */
2855        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2856        for( i=0; i<mustCloseCount; ++i )
2857            closePeer( t, mustClose[i] );
2858        tr_free( mustClose );
2859    }
2860}
2861
2862struct peer_liveliness
2863{
2864    tr_peer * peer;
2865    void * clientData;
2866    time_t pieceDataTime;
2867    time_t time;
2868    int speed;
2869    tr_bool doPurge;
2870};
2871
2872static int
2873comparePeerLiveliness( const void * va, const void * vb )
2874{
2875    const struct peer_liveliness * a = va;
2876    const struct peer_liveliness * b = vb;
2877
2878    if( a->doPurge != b->doPurge )
2879        return a->doPurge ? 1 : -1;
2880
2881    if( a->speed != b->speed ) /* faster goes first */
2882        return a->speed > b->speed ? -1 : 1;
2883
2884    /* the one to give us data more recently goes first */
2885    if( a->pieceDataTime != b->pieceDataTime )
2886        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2887
2888    /* the one we connected to most recently goes first */
2889    if( a->time != b->time )
2890        return a->time > b->time ? -1 : 1;
2891
2892    return 0;
2893}
2894
2895static int
2896comparePeerLivelinessReverse( const void * va, const void * vb )
2897{
2898    return -comparePeerLiveliness (va, vb);
2899}
2900
2901static void
2902sortPeersByLivelinessImpl( tr_peer  ** peers,
2903                           void     ** clientData,
2904                           int         n,
2905                           uint64_t    now,
2906                           int (*compare) ( const void *va, const void *vb ) )
2907{
2908    int i;
2909    struct peer_liveliness *lives, *l;
2910
2911    /* build a sortable array of peer + extra info */
2912    lives = l = tr_new0( struct peer_liveliness, n );
2913    for( i=0; i<n; ++i, ++l )
2914    {
2915        tr_peer * p = peers[i];
2916        l->peer = p;
2917        l->doPurge = p->doPurge;
2918        l->pieceDataTime = p->atom->piece_data_time;
2919        l->time = p->atom->time;
2920        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2921                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2922        if( clientData )
2923            l->clientData = clientData[i];
2924    }
2925
2926    /* sort 'em */
2927    assert( n == ( l - lives ) );
2928    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2929
2930    /* build the peer array */
2931    for( i=0, l=lives; i<n; ++i, ++l ) {
2932        peers[i] = l->peer;
2933        if( clientData )
2934            clientData[i] = l->clientData;
2935    }
2936    assert( n == ( l - lives ) );
2937
2938    /* cleanup */
2939    tr_free( lives );
2940}
2941
2942static void
2943sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2944{
2945    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2946}
2947
2948static void
2949sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2950{
2951    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2952}
2953
2954
2955static void
2956enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2957{
2958    int n = tr_ptrArraySize( &t->peers );
2959    const int max = tr_torrentGetPeerLimit( t->tor );
2960    if( n > max )
2961    {
2962        void * base = tr_ptrArrayBase( &t->peers );
2963        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2964        sortPeersByLiveliness( peers, NULL, n, now );
2965        while( n > max )
2966            closePeer( t, peers[--n] );
2967        tr_free( peers );
2968    }
2969}
2970
2971static void
2972enforceSessionPeerLimit( tr_session * session, uint64_t now )
2973{
2974    int n = 0;
2975    tr_torrent * tor = NULL;
2976    const int max = tr_sessionGetPeerLimit( session );
2977
2978    /* count the total number of peers */
2979    while(( tor = tr_torrentNext( session, tor )))
2980        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2981
2982    /* if there are too many, prune out the worst */
2983    if( n > max )
2984    {
2985        tr_peer ** peers = tr_new( tr_peer*, n );
2986        Torrent ** torrents = tr_new( Torrent*, n );
2987
2988        /* populate the peer array */
2989        n = 0;
2990        tor = NULL;
2991        while(( tor = tr_torrentNext( session, tor ))) {
2992            int i;
2993            Torrent * t = tor->torrentPeers;
2994            const int tn = tr_ptrArraySize( &t->peers );
2995            for( i=0; i<tn; ++i, ++n ) {
2996                peers[n] = tr_ptrArrayNth( &t->peers, i );
2997                torrents[n] = t;
2998            }
2999        }
3000
3001        /* sort 'em */
3002        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3003
3004        /* cull out the crappiest */
3005        while( n-- > max )
3006            closePeer( torrents[n], peers[n] );
3007
3008        /* cleanup */
3009        tr_free( torrents );
3010        tr_free( peers );
3011    }
3012}
3013
3014static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3015
3016static void
3017reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3018{
3019    tr_torrent * tor;
3020    tr_peerMgr * mgr = vmgr;
3021    const uint64_t now = tr_date( );
3022
3023    /**
3024    ***  enforce the per-session and per-torrent peer limits
3025    **/
3026
3027    /* if we're over the per-torrent peer limits, cull some peers */
3028    tor = NULL;
3029    while(( tor = tr_torrentNext( mgr->session, tor )))
3030        if( tor->isRunning )
3031            enforceTorrentPeerLimit( tor->torrentPeers, now );
3032
3033    /* if we're over the per-session peer limits, cull some peers */
3034    enforceSessionPeerLimit( mgr->session, now );
3035
3036    /* remove crappy peers */
3037    tor = NULL;
3038    while(( tor = tr_torrentNext( mgr->session, tor )))
3039        closeBadPeers( tor->torrentPeers );
3040
3041    /* try to make new peer connections */
3042    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3043}
3044
3045/****
3046*****
3047*****  BANDWIDTH ALLOCATION
3048*****
3049****/
3050
3051static void
3052pumpAllPeers( tr_peerMgr * mgr )
3053{
3054    tr_torrent * tor = NULL;
3055
3056    while(( tor = tr_torrentNext( mgr->session, tor )))
3057    {
3058        int j;
3059        Torrent * t = tor->torrentPeers;
3060
3061        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3062        {
3063            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3064            tr_peerMsgsPulse( peer->msgs );
3065        }
3066    }
3067}
3068
3069static void
3070bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3071{
3072    tr_torrent * tor;
3073    tr_peerMgr * mgr = vmgr;
3074    managerLock( mgr );
3075
3076    /* FIXME: this next line probably isn't necessary... */
3077    pumpAllPeers( mgr );
3078
3079    /* allocate bandwidth to the peers */
3080    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3081    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3082
3083    /* possibly stop torrents that have seeded enough */
3084    tor = NULL;
3085    while(( tor = tr_torrentNext( mgr->session, tor )))
3086        tr_torrentCheckSeedRatio( tor );
3087
3088    /* run the completeness check for any torrents that need it */
3089    tor = NULL;
3090    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3091        if( tor->torrentPeers->needsCompletenessCheck ) {
3092            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3093            tr_torrentRecheckCompleteness( tor );
3094        }
3095    }
3096
3097    /* possibly stop torrents that have an error */
3098    tor = NULL;
3099    while(( tor = tr_torrentNext( mgr->session, tor )))
3100        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
3101            tr_torrentStop( tor );
3102
3103    reconnectPulse( 0, 0, mgr );
3104
3105    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3106    managerUnlock( mgr );
3107}
3108
3109/***
3110****
3111***/
3112
3113static int
3114compareAtomPtrsByAddress( const void * va, const void *vb )
3115{
3116    const struct peer_atom * a = * (const struct peer_atom**) va;
3117    const struct peer_atom * b = * (const struct peer_atom**) vb;
3118
3119    assert( tr_isAtom( a ) );
3120    assert( tr_isAtom( b ) );
3121
3122    return tr_compareAddresses( &a->addr, &b->addr );
3123}
3124
3125/* best come first, worst go last */
3126static int
3127compareAtomPtrsByShelfDate( const void * va, const void *vb )
3128{
3129    time_t atime;
3130    time_t btime;
3131    const struct peer_atom * a = * (const struct peer_atom**) va;
3132    const struct peer_atom * b = * (const struct peer_atom**) vb;
3133    const int data_time_cutoff_secs = 60 * 60;
3134    const time_t tr_now = tr_time( );
3135
3136    assert( tr_isAtom( a ) );
3137    assert( tr_isAtom( b ) );
3138
3139    /* primary key: the last piece data time *if* it was within the last hour */
3140    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3141    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3142    if( atime != btime )
3143        return atime > btime ? -1 : 1;
3144
3145    /* secondary key: shelf date. */
3146    if( a->shelf_date != b->shelf_date )
3147        return a->shelf_date > b->shelf_date ? -1 : 1;
3148
3149    return 0;
3150}
3151
3152static int
3153getMaxAtomCount( const tr_torrent * tor )
3154{
3155    /* FIXME: this curve should be smoother... */
3156    const int n = tor->maxConnectedPeers;
3157    if( n >= 200 ) return n * 1.5;
3158    if( n >= 100 ) return n * 2;
3159    if( n >=  50 ) return n * 3;
3160    if( n >=  20 ) return n * 5;
3161    return n * 10;
3162}
3163
3164static void
3165atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3166{
3167    tr_torrent * tor = NULL;
3168    tr_peerMgr * mgr = vmgr;
3169    managerLock( mgr );
3170
3171    while(( tor = tr_torrentNext( mgr->session, tor )))
3172    {
3173        int atomCount;
3174        Torrent * t = tor->torrentPeers;
3175        const int maxAtomCount = getMaxAtomCount( tor );
3176        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3177
3178        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3179        {
3180            int i;
3181            int keepCount = 0;
3182            int testCount = 0;
3183            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3184            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3185
3186            /* keep the ones that are in use */
3187            for( i=0; i<atomCount; ++i ) {
3188                struct peer_atom * atom = atoms[i];
3189                if( peerIsInUse( t, atom ) )
3190                    keep[keepCount++] = atom;
3191                else
3192                    test[testCount++] = atom;
3193            }
3194
3195            /* if there's room, keep the best of what's left */
3196            i = 0;
3197            if( keepCount < maxAtomCount ) {
3198                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3199                while( i<testCount && keepCount<maxAtomCount )
3200                    keep[keepCount++] = test[i++];
3201            }
3202
3203            /* free the culled atoms */
3204            while( i<testCount )
3205                tr_free( test[i++] );
3206
3207            /* rebuild Torrent.pool with what's left */
3208            tr_ptrArrayDestruct( &t->pool, NULL );
3209            t->pool = TR_PTR_ARRAY_INIT;
3210            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3211            for( i=0; i<keepCount; ++i )
3212                tr_ptrArrayAppend( &t->pool, keep[i] );
3213
3214            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3215
3216            /* cleanup */
3217            tr_free( test );
3218            tr_free( keep );
3219        }
3220    }
3221
3222    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3223    managerUnlock( mgr );
3224}
3225
3226/***
3227****
3228****
3229****
3230***/
3231
3232static inline tr_bool
3233isBandwidthMaxedOut( const tr_bandwidth * b,
3234                     const uint64_t now_msec, tr_direction dir )
3235{
3236    if( !tr_bandwidthIsLimited( b, dir ) )
3237        return FALSE;
3238    else {
3239        const double got = tr_bandwidthGetPieceSpeed( b, now_msec, dir );
3240        const double want = tr_bandwidthGetDesiredSpeed( b, dir );
3241        return got >= want;
3242    }
3243}
3244
3245/* is this atom someone that we'd want to initiate a connection to? */
3246static tr_bool
3247isPeerCandidate( const tr_torrent * tor, const struct peer_atom * atom, const time_t now )
3248{
3249    /* not if they're banned... */
3250    if( atom->myflags & MYFLAG_BANNED )
3251        return FALSE;
3252
3253    /* not if we just tried them already */
3254    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3255        return FALSE;
3256
3257    /* not if we're both seeds */
3258    if( tr_torrentIsSeed( tor ) )
3259        if( atomIsSeed( atom ) || ( atom->uploadOnly == UPLOAD_ONLY_YES ) )
3260            return FALSE;
3261 
3262    /* not if they're blocklisted */
3263    /* FIXME: maybe we should remove this atom altogether? */
3264    if( tr_sessionIsAddressBlocked( tor->session, &atom->addr ) )
3265        return FALSE;
3266
3267    /* not if we've already got a connection to them...  */
3268    if( peerIsInUse( tor->torrentPeers, atom ) )
3269        return FALSE;
3270
3271    return TRUE;
3272}
3273
3274struct peer_candidate
3275{
3276    uint64_t score;
3277    tr_torrent * tor;
3278    struct peer_atom * atom;
3279};
3280
3281static tr_bool
3282torrentWasRecentlyStarted( const tr_torrent * tor )
3283{
3284    return difftime( tr_time( ), tor->startDate ) < 120;
3285}
3286
3287static uint64_t
3288addValToKey( uint64_t value, int width, uint64_t addme )
3289{
3290    value = (value << (uint64_t)width);
3291    value |= addme;
3292    return value;
3293}
3294
3295/* smaller value is better */
3296static uint64_t
3297getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3298{
3299    uint64_t i;
3300    uint64_t score = 0;
3301    const tr_bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3302
3303    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3304    i = failed ? 1 : 0;
3305    score = addValToKey( score, 1, i );
3306
3307    /* prefer the one we attempted least recently (to cycle through all peers) */
3308    i = atom->lastConnectionAttemptAt;
3309    score = addValToKey( score, 32, i );
3310
3311    /* prefer peers belonging to a torrent of a higher priority */
3312    switch( tr_torrentGetPriority( tor ) ) {
3313        case TR_PRI_HIGH:    i = 0; break;
3314        case TR_PRI_NORMAL:  i = 1; break;
3315        case TR_PRI_LOW:     i = 2; break;
3316    }
3317    score = addValToKey( score, 4, i );
3318
3319    /* prefer recently-started torrents */
3320    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3321    score = addValToKey( score, 1, i );
3322
3323    /* prefer peers that we might have a chance of uploading to...
3324       so lower seed probability is better */
3325    if( atom->seedProbability == 100 ) i = 101;
3326    else if( atom->seedProbability == -1 ) i = 100;
3327    else i = atom->seedProbability;
3328    score = addValToKey( score, 8, i );
3329
3330    /* Prefer peers that we got from more trusted sources.
3331     * lower `from' values indicate more trusted sources */
3332    score = addValToKey( score, 4, atom->from );
3333
3334    /* salt */
3335    score = addValToKey( score, 8, salt );
3336
3337    return score;
3338}
3339
3340/* sort an array of peer candidates */
3341static int
3342comparePeerCandidates( const void * va, const void * vb )
3343{
3344    const struct peer_candidate * a = va;
3345    const struct peer_candidate * b = vb;
3346
3347    if( a->score < b->score ) return -1;
3348    if( a->score > b->score ) return 1;
3349
3350    return 0;
3351}
3352
3353/** @return an array of all the atoms we might want to connect to */
3354static struct peer_candidate*
3355getPeerCandidates( tr_session * session, int * candidateCount )
3356{
3357    int n;
3358    tr_torrent * tor;
3359    struct peer_candidate * candidates;
3360    struct peer_candidate * walk;
3361    const time_t now = tr_time( );
3362    const uint64_t now_msec = tr_date( );
3363    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3364    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3365
3366    /* don't start any new handshakes if we're full up */
3367    n = 0;
3368    tor= NULL;
3369    while(( tor = tr_torrentNext( session, tor )))
3370        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3371    if( maxCandidates <= n ) {
3372        *candidateCount = 0;
3373        return NULL;
3374    }
3375
3376    /* allocate an array of candidates */
3377    n = 0;
3378    tor= NULL;
3379    while(( tor = tr_torrentNext( session, tor )))
3380        n += tr_ptrArraySize( &tor->torrentPeers->pool );
3381    walk = candidates = tr_new( struct peer_candidate, n );
3382
3383    /* populate the candidate array */
3384    tor = NULL;
3385    while(( tor = tr_torrentNext( session, tor )))
3386    {
3387        int i, nAtoms;
3388        struct peer_atom ** atoms;
3389
3390        if( !tor->torrentPeers->isRunning )
3391            continue;
3392
3393        /* if we've already got enough peers in this torrent... */
3394        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3395            continue;
3396
3397        /* if we've already got enough speed in this torrent... */
3398        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( tor->bandwidth, now_msec, TR_UP ) )
3399            continue;
3400
3401        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3402        for( i=0; i<nAtoms; ++i )
3403        {
3404            struct peer_atom * atom = atoms[i];
3405
3406            if( isPeerCandidate( tor, atom, now ) )
3407            {
3408                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3409                walk->tor = tor;
3410                walk->atom = atom;
3411                walk->score = getPeerCandidateScore( tor, atom, salt );
3412                ++walk;
3413            }
3414        }
3415    }
3416
3417    *candidateCount = walk - candidates;
3418    if( *candidateCount > 1 )
3419        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3420    return candidates;
3421}
3422
3423static void
3424initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3425{
3426    tr_peerIo * io;
3427    const time_t now = tr_time( );
3428
3429    tordbg( t, "Starting an OUTGOING connection with %s", tr_atomAddrStr( atom ) );
3430
3431    io = tr_peerIoNewOutgoing( mgr->session,
3432                               mgr->session->bandwidth,
3433                               &atom->addr,
3434                               atom->port,
3435                               t->tor->info.hash,
3436                               t->tor->completeness == TR_SEED );
3437
3438    if( io == NULL )
3439    {
3440        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3441                tr_atomAddrStr( atom ) );
3442        atom->myflags |= MYFLAG_UNREACHABLE;
3443        atom->numFails++;
3444    }
3445    else
3446    {
3447        tr_handshake * handshake = tr_handshakeNew( io,
3448                                                    mgr->session->encryptionMode,
3449                                                    myHandshakeDoneCB,
3450                                                    mgr );
3451
3452        assert( tr_peerIoGetTorrentHash( io ) );
3453
3454        tr_peerIoUnref( io ); /* balanced by the initial ref
3455                                 in tr_peerIoNewOutgoing() */
3456
3457        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3458                                 handshakeCompare );
3459    }
3460
3461    atom->lastConnectionAttemptAt = now;
3462    atom->time = now;
3463}
3464
3465static void
3466initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3467{
3468#if 0
3469    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3470             tr_atomAddrStr( c->atom ),
3471             tr_torrentName( c->tor ),
3472             (int)c->atom->seedProbability,
3473             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3474             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3475#endif
3476
3477    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3478}
3479
3480static void
3481makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3482{
3483    int i, n;
3484    struct peer_candidate * candidates;
3485
3486    candidates = getPeerCandidates( mgr->session, &n );
3487
3488    for( i=0; i<n && i<max; ++i )
3489        initiateCandidateConnection( mgr, &candidates[i] );
3490
3491    tr_free( candidates );
3492}
Note: See TracBrowser for help on using the repository browser.