source: trunk/libtransmission/peer-mgr.c @ 9466

Last change on this file since 9466 was 9466, checked in by charles, 12 years ago

(trunk libT) turn off a debugging message in the terminal

  • Property svn:keywords set to Date Rev Author Id
File size: 73.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 9466 2009-11-01 03:56:10Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "ptrarray.h"
35#include "session.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 400,
49
50    /* how frequently to reallocate bandwidth */
51    BANDWIDTH_PERIOD_MSEC = 500,
52
53    /* how frequently to age out old piece request lists */
54    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = 500,
58
59    /* when many peers are available, keep idle ones this long */
60    MIN_UPLOAD_IDLE_SECS = ( 60 ),
61
62    /* when few peers are available, keep idle ones this long */
63    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
64
65    /* max # of peers to ask fer per torrent per reconnect pulse */
66    MAX_RECONNECTIONS_PER_PULSE = 8,
67
68    /* max number of peers to ask for per second overall.
69    * this throttle is to avoid overloading the router */
70    MAX_CONNECTIONS_PER_SECOND = 16,
71
72    /* number of bad pieces a peer is allowed to send before we ban them */
73    MAX_BAD_PIECES_PER_PEER = 5,
74
75    /* amount of time to keep a list of request pieces lying around
76       before it's considered too old and needs to be rebuilt */
77    PIECE_LIST_SHELF_LIFE_SECS = 60,
78
79    /* use for bitwise operations w/peer_atom.myflags */
80    MYFLAG_BANNED = 1,
81
82    /* use for bitwise operations w/peer_atom.myflags */
83    /* unreachable for now... but not banned.
84     * if they try to connect to us it's okay */
85    MYFLAG_UNREACHABLE = 2,
86
87    /* the minimum we'll wait before attempting to reconnect to a peer */
88    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
89
90    /* this is how many blocks we'll try to queue up
91     * for the iterator to walk through */
92    ITERATOR_BLOCK_BUFFER_SIZE = 4096,
93
94    /* if the number of blocks in the iterator queue drops
95     * below this number, fill it up with more */
96    ITERATOR_LOW_MARK = 256
97};
98
99/**
100***
101**/
102
103enum
104{
105    UPLOAD_ONLY_UKNOWN,
106    UPLOAD_ONLY_YES,
107    UPLOAD_ONLY_NO
108};
109
110/**
111 * Peer information that should be kept even before we've connected and
112 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
113 * which ones would make good candidates for connecting to, and to watch out
114 * for banned peers.
115 *
116 * @see tr_peer
117 * @see tr_peermsgs
118 */
119struct peer_atom
120{
121    uint8_t     from;
122    uint8_t     flags;       /* these match the added_f flags */
123    uint8_t     myflags;     /* flags that aren't defined in added_f */
124    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
125    tr_port     port;
126    uint16_t    numFails;
127    tr_address  addr;
128    time_t      time;        /* when the peer's connection status last changed */
129    time_t      piece_data_time;
130};
131
132static tr_bool
133tr_isAtom( const struct peer_atom * atom )
134{
135    return ( atom != NULL )
136        && ( atom->from < TR_PEER_FROM__MAX )
137        && ( tr_isAddress( &atom->addr ) );
138}
139
140static const char*
141tr_atomAddrStr( const struct peer_atom * atom )
142{
143    return tr_peerIoAddrStr( &atom->addr, atom->port );
144}
145
146struct tr_blockIteratorItem
147{
148    tr_block_index_t block;
149    uint8_t requestCount;
150};
151
152struct tr_blockIterator
153{
154    tr_bool didLoop;
155    int pos;
156    int size;
157    int begin;
158    struct tr_blockIteratorItem items[ITERATOR_BLOCK_BUFFER_SIZE];
159    struct tr_torrent_peers * t;
160    tr_priority_t priority;
161};
162
163typedef struct tr_torrent_peers
164{
165    struct event               refillTimer;
166
167    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
168    tr_ptrArray                pool; /* struct peer_atom */
169    tr_ptrArray                peers; /* tr_peer */
170    tr_ptrArray                webseeds; /* tr_webseed */
171
172    tr_torrent               * tor;
173    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
174    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
175    struct tr_peerMgr        * manager;
176
177    tr_bool                    isRunning;
178}
179Torrent;
180
181struct tr_peerMgr
182{
183    tr_session      * session;
184    tr_ptrArray       incomingHandshakes; /* tr_handshake */
185    tr_timer        * bandwidthTimer;
186    tr_timer        * rechokeTimer;
187    tr_timer        * reconnectTimer;
188};
189
190#define tordbg( t, ... ) \
191    do { \
192        if( tr_deepLoggingIsActive( ) ) \
193            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
194    } while( 0 )
195
196#define dbgmsg( ... ) \
197    do { \
198        if( tr_deepLoggingIsActive( ) ) \
199            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
200    } while( 0 )
201
202/**
203***
204**/
205
206static TR_INLINE void
207managerLock( const struct tr_peerMgr * manager )
208{
209    tr_globalLock( manager->session );
210}
211
212static TR_INLINE void
213managerUnlock( const struct tr_peerMgr * manager )
214{
215    tr_globalUnlock( manager->session );
216}
217
218static TR_INLINE void
219torrentLock( Torrent * torrent )
220{
221    managerLock( torrent->manager );
222}
223
224static TR_INLINE void
225torrentUnlock( Torrent * torrent )
226{
227    managerUnlock( torrent->manager );
228}
229
230static TR_INLINE int
231torrentIsLocked( const Torrent * t )
232{
233    return tr_globalIsLocked( t->manager->session );
234}
235
236/**
237***
238**/
239
240static int
241handshakeCompareToAddr( const void * va, const void * vb )
242{
243    const tr_handshake * a = va;
244
245    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
246}
247
248static int
249handshakeCompare( const void * a, const void * b )
250{
251    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
252}
253
254static tr_handshake*
255getExistingHandshake( tr_ptrArray      * handshakes,
256                      const tr_address * addr )
257{
258    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
259}
260
261static int
262comparePeerAtomToAddress( const void * va, const void * vb )
263{
264    const struct peer_atom * a = va;
265
266    return tr_compareAddresses( &a->addr, vb );
267}
268
269static int
270comparePeerAtoms( const void * va, const void * vb )
271{
272    const struct peer_atom * b = vb;
273
274    assert( tr_isAtom( b ) );
275
276    return comparePeerAtomToAddress( va, &b->addr );
277}
278
279/**
280***
281**/
282
283const tr_address *
284tr_peerAddress( const tr_peer * peer )
285{
286    return &peer->atom->addr;
287}
288
289static Torrent*
290getExistingTorrent( tr_peerMgr *    manager,
291                    const uint8_t * hash )
292{
293    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
294
295    return tor == NULL ? NULL : tor->torrentPeers;
296}
297
298static int
299peerCompare( const void * a, const void * b )
300{
301    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
302}
303
304static int
305peerCompareToAddr( const void * a, const void * vb )
306{
307    return tr_compareAddresses( tr_peerAddress( a ), vb );
308}
309
310static tr_peer*
311getExistingPeer( Torrent          * torrent,
312                 const tr_address * addr )
313{
314    assert( torrentIsLocked( torrent ) );
315    assert( addr );
316
317    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
318}
319
320static struct peer_atom*
321getExistingAtom( const Torrent    * t,
322                 const tr_address * addr )
323{
324    Torrent * tt = (Torrent*)t;
325    assert( torrentIsLocked( t ) );
326    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
327}
328
329static tr_bool
330peerIsInUse( const Torrent    * ct,
331             const tr_address * addr )
332{
333    Torrent * t = (Torrent*) ct;
334
335    assert( torrentIsLocked ( t ) );
336
337    return getExistingPeer( t, addr )
338        || getExistingHandshake( &t->outgoingHandshakes, addr )
339        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
340}
341
342static tr_peer*
343peerConstructor( struct peer_atom * atom )
344{
345    tr_peer * peer = tr_new0( tr_peer, 1 );
346    peer->atom = atom;
347    return peer;
348}
349
350static tr_peer*
351getPeer( Torrent * torrent, struct peer_atom * atom )
352{
353    tr_peer * peer;
354
355    assert( torrentIsLocked( torrent ) );
356
357    peer = getExistingPeer( torrent, &atom->addr );
358
359    if( peer == NULL )
360    {
361        peer = peerConstructor( atom );
362        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
363    }
364
365    return peer;
366}
367
368static void
369peerDestructor( tr_peer * peer )
370{
371    assert( peer );
372
373    if( peer->msgs != NULL )
374    {
375        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
376        tr_peerMsgsFree( peer->msgs );
377    }
378
379    tr_peerIoClear( peer->io );
380    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
381
382    tr_bitfieldFree( peer->have );
383    tr_bitfieldFree( peer->blame );
384    tr_free( peer->client );
385
386    tr_free( peer );
387}
388
389static void
390removePeer( Torrent * t, tr_peer * peer )
391{
392    tr_peer * removed;
393    struct peer_atom * atom = peer->atom;
394
395    assert( torrentIsLocked( t ) );
396    assert( atom );
397
398    atom->time = time( NULL );
399
400    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
401    assert( removed == peer );
402    peerDestructor( removed );
403}
404
405static void
406removeAllPeers( Torrent * t )
407{
408    while( !tr_ptrArrayEmpty( &t->peers ) )
409        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
410}
411
412static void blockIteratorFree( struct tr_blockIterator ** inout );
413
414static void
415torrentDestructor( void * vt )
416{
417    Torrent * t = vt;
418
419    assert( t );
420    assert( !t->isRunning );
421    assert( torrentIsLocked( t ) );
422    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
423    assert( tr_ptrArrayEmpty( &t->peers ) );
424
425    evtimer_del( &t->refillTimer );
426
427    blockIteratorFree( &t->refillQueue );
428    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
429    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
430    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
431    tr_ptrArrayDestruct( &t->peers, NULL );
432
433    tr_free( t );
434}
435
436
437static void refillPulse( int, short, void* );
438
439static void peerCallbackFunc( void * vpeer,
440                              void * vevent,
441                              void * vt );
442
443static Torrent*
444torrentConstructor( tr_peerMgr * manager,
445                    tr_torrent * tor )
446{
447    int       i;
448    Torrent * t;
449
450    t = tr_new0( Torrent, 1 );
451    t->manager = manager;
452    t->tor = tor;
453    t->pool = TR_PTR_ARRAY_INIT;
454    t->peers = TR_PTR_ARRAY_INIT;
455    t->webseeds = TR_PTR_ARRAY_INIT;
456    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
457    evtimer_set( &t->refillTimer, refillPulse, t );
458
459
460    for( i = 0; i < tor->info.webseedCount; ++i )
461    {
462        tr_webseed * w =
463            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
464        tr_ptrArrayAppend( &t->webseeds, w );
465    }
466
467    return t;
468}
469
470
471static int bandwidthPulse ( void * vmgr );
472static int rechokePulse   ( void * vmgr );
473static int reconnectPulse ( void * vmgr );
474
475tr_peerMgr*
476tr_peerMgrNew( tr_session * session )
477{
478    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
479    m->session = session;
480    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
481    return m;
482}
483
484static void
485deleteTimers( struct tr_peerMgr * m )
486{
487    if( m->bandwidthTimer )
488        tr_timerFree( &m->bandwidthTimer );
489
490    if( m->rechokeTimer )
491        tr_timerFree( &m->rechokeTimer );
492
493    if( m->reconnectTimer )
494        tr_timerFree( &m->reconnectTimer );
495}
496
497void
498tr_peerMgrFree( tr_peerMgr * manager )
499{
500    managerLock( manager );
501
502    deleteTimers( manager );
503
504    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
505     * the item from manager->handshakes, so this is a little roundabout... */
506    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
507        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
508
509    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
510
511    managerUnlock( manager );
512    tr_free( manager );
513}
514
515static int
516clientIsDownloadingFrom( const tr_peer * peer )
517{
518    return peer->clientIsInterested && !peer->clientIsChoked;
519}
520
521static int
522clientIsUploadingTo( const tr_peer * peer )
523{
524    return peer->peerIsInterested && !peer->peerIsChoked;
525}
526
527/***
528****
529***/
530
531tr_bool
532tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
533                      const tr_address  * addr )
534{
535    tr_bool isSeed = FALSE;
536    const Torrent * t = tor->torrentPeers;
537    const struct peer_atom * atom = getExistingAtom( t, addr );
538
539    if( atom )
540        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
541
542    return isSeed;
543}
544
545/****
546*****
547*****  REFILL
548*****
549****/
550
551static struct tr_blockIterator*
552blockIteratorNew( Torrent * t )
553{
554    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
555    i->pos = 0;
556    i->size = 0;
557    i->priority = TR_PRI_HIGH;
558    i->t = t;
559    return i;
560}
561
562static int
563compareIteratorItems( const void * va, const void * vb )
564{
565    const struct tr_blockIteratorItem * a = va;
566    const struct tr_blockIteratorItem * b = vb;
567    if( a->block < b->block ) return -1;
568    if( a->block > b->block ) return 1;
569    return 0;
570}
571
572static void
573blockIteratorRefill( struct tr_blockIterator * it )
574{
575    int pieceCount;
576    tr_piece_index_t * pieces;
577    const tr_torrent * tor = it->t->tor;
578    const tr_info * inf = tr_torrentInfo( tor );
579
580    /* build a pool of the pieces we might request blocks from */
581    pieces = tr_new( tr_piece_index_t, inf->pieceCount );
582    pieceCount = 0;
583    {
584        tr_piece_index_t i;
585        for( i=0; i<inf->pieceCount; ++i )
586            if( !inf->pieces[i].dnd )
587                if( inf->pieces[i].priority == it->priority )
588                    if( !tr_cpPieceIsComplete( &tor->completion, i ) )
589                        pieces[pieceCount++] = i;
590    }
591
592    /* while we're short on blocks and there are still pieces left... */
593    while( ( it->size < ITERATOR_BLOCK_BUFFER_SIZE ) && ( pieceCount > 0 ) )
594    {
595        tr_block_index_t i;
596        tr_block_index_t b;
597        tr_block_index_t e;
598
599        /* pull a random piece out of the pool */
600        const int poolIndex = tr_cryptoRandInt( pieceCount );
601        const tr_piece_index_t piece = pieces[poolIndex];
602        pieces[poolIndex] = pieces[--pieceCount];
603
604        /* add the piece's blocks that we don't have to our iterator */
605        b = tr_torPieceFirstBlock( tor, piece );
606        e = b + tr_torPieceCountBlocks( tor, piece );
607        for( i=b; (i!=e) && (it->size < ITERATOR_BLOCK_BUFFER_SIZE); ++i ) {
608            if( !tr_cpBlockIsCompleteFast( &tor->completion, i ) ) {
609                int pos;
610                struct tr_blockIteratorItem tmp;
611                tr_bool match;
612                tmp.block = i;
613                tmp.requestCount = 0;
614                pos = tr_lowerBound( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems, &match );
615                if( match )
616                    continue;
617                memmove( it->items+pos+1, it->items+pos, sizeof(struct tr_blockIteratorItem)*(it->size-pos) );
618                it->items[pos] = tmp;
619                ++it->size;
620            }
621        }
622    }
623
624    if( it->pos >= it->size )
625        it->pos = 0;
626
627    /* cleanup */
628    tr_free( pieces );
629}
630
631static void
632blockIteratorRewind( struct tr_blockIterator * it )
633{
634    /* if we don't have any blocks in the iterator,
635     * try to regenerate a list of blocks in the current priority.
636     * if that fails, go to the next lower priority and retry. */
637    for( ;; ) {
638        if( it->size <= ITERATOR_LOW_MARK )
639            blockIteratorRefill( it );
640        if( it->size > 0 )
641            break;
642        if( it->priority == TR_PRI_LOW )
643            return;
644        --it->priority; /* try a lower priority */
645    }
646
647    it->begin = it->pos;
648    it->didLoop = FALSE;
649}
650
651static tr_bool
652blockIteratorNext( struct tr_blockIterator * it, tr_block_index_t * setme )
653{
654    static const int single_req_threshold = 100;
655    static const int double_req_threshold = 50;
656
657    while( !it->didLoop )
658    {
659        ++it->pos;
660        it->pos %= it->size;
661        it->didLoop |= it->pos == it->begin;
662
663        if( ( it->items[it->pos].requestCount >= 1 ) && ( it->size >= single_req_threshold ) )
664            continue;
665        if( ( it->items[it->pos].requestCount >= 2 ) && ( it->size >= double_req_threshold ) )
666            continue;
667
668        *setme = it->items[it->pos].block;
669        return TRUE;
670    }
671
672    return FALSE;
673}
674static void
675blockIteratorInvalidate( struct tr_blockIterator * it )
676{
677    it->size = 0;
678    it->priority = TR_PRI_HIGH;
679}
680
681void
682tr_peerMgrFilePrioritiesChanged( tr_torrent * tor )
683{
684    if( ( tor != NULL ) && ( tor->torrentPeers != NULL ) && ( tor->torrentPeers->refillQueue != NULL ) )
685        blockIteratorInvalidate( tor->torrentPeers->refillQueue );
686}
687
688static void
689blockIteratorRemoveBlock( struct tr_blockIterator * it, tr_block_index_t block )
690{
691    if( it != NULL )
692    {
693        struct tr_blockIteratorItem tmp, *pos;
694        tmp.block = block;
695        pos = bsearch( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems );
696        if( pos != NULL )
697        {
698            const int i = pos - it->items;
699
700            assert( pos->block == block );
701            assert( i >= 0 );
702            assert( i < it->size );
703
704            --it->size;
705
706            memmove( it->items+i, it->items+i+1, sizeof(struct tr_blockIteratorItem) * (it->size-i) );
707
708            if( it->pos > i )
709                --it->pos;
710        }
711    }
712}
713
714static void
715blockIteratorFree( struct tr_blockIterator ** inout )
716{
717    tr_free( *inout );
718    *inout = NULL;
719}
720
721static tr_peer**
722getPeersUploadingToClient( Torrent * t, int * setmeCount )
723{
724    int j;
725    int peerCount = 0;
726    int retCount = 0;
727    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
728    tr_peer ** ret = tr_new( tr_peer *, peerCount );
729
730    j = 0; /* this is a temporary test to make sure we walk through all the peers */
731    if( peerCount )
732    {
733        /* Get a list of peers we're downloading from.
734           Pick a different starting point each time so all peers
735           get a chance at being the first in line */
736        const int fencepost = tr_cryptoWeakRandInt( peerCount );
737        int i = fencepost;
738        do {
739            if( clientIsDownloadingFrom( peers[i] ) )
740                ret[retCount++] = peers[i];
741            i = ( i + 1 ) % peerCount;
742            ++j;
743        } while( i != fencepost );
744    }
745    assert( j == peerCount );
746    *setmeCount = retCount;
747    return ret;
748}
749
750static uint32_t
751getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
752{
753    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
754    const uint64_t blockPos = tor->blockSize * b;
755    assert( blockPos >= piecePos );
756    return (uint32_t)( blockPos - piecePos );
757}
758
759static void
760sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now );
761
762static void
763refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
764{
765    tr_block_index_t block;
766    int peerCount;
767    int webseedCount;
768    tr_peer ** peers;
769    tr_webseed ** webseeds;
770    Torrent * t = vtorrent;
771    tr_torrent * tor = t->tor;
772    tr_bool hasNext = TRUE;
773
774    if( !t->isRunning )
775        return;
776    if( tr_torrentIsSeed( t->tor ) ) {
777        blockIteratorFree( &t->refillQueue );
778        return;
779    }
780
781    torrentLock( t );
782    tordbg( t, "Refilling Request Buffers..." );
783
784    if( t->refillQueue == NULL )
785        t->refillQueue = blockIteratorNew( t );
786
787    peers = getPeersUploadingToClient( t, &peerCount );
788    sortPeersByLiveliness( peers, NULL, peerCount, tr_date( ) );
789    webseedCount = tr_ptrArraySize( &t->webseeds );
790    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
791                          webseedCount * sizeof( tr_webseed* ) );
792
793    blockIteratorRewind( t->refillQueue );
794
795    while( ( webseedCount || peerCount )
796        && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
797    {
798        int j;
799        tr_bool handled = FALSE;
800
801        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
802        const uint32_t offset = getBlockOffsetInPiece( tor, block );
803        const uint32_t length = tr_torBlockCountBytes( tor, block );
804
805        assert( block < tor->blockCount );
806
807        /* find a peer who can ask for this block */
808        for( j=0; !handled && j<peerCount; )
809        {
810            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
811            switch( val )
812            {
813                case TR_ADDREQ_FULL:
814                case TR_ADDREQ_CLIENT_CHOKED:
815                    peers[j] = peers[--peerCount];
816                    break;
817
818                case TR_ADDREQ_MISSING:
819                case TR_ADDREQ_DUPLICATE:
820                    ++j;
821                    break;
822
823                case TR_ADDREQ_OK:
824                    ++t->refillQueue->items[t->refillQueue->pos].requestCount;
825                    handled = TRUE;
826                    break;
827
828                default:
829                    assert( 0 && "unhandled value" );
830                    break;
831            }
832        }
833
834        /* maybe one of the webseeds can do it */
835        for( j=0; !handled && j<webseedCount; )
836        {
837            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
838            switch( val )
839            {
840                case TR_ADDREQ_FULL:
841                    webseeds[j] = webseeds[--webseedCount];
842                    break;
843
844                case TR_ADDREQ_OK:
845                    handled = TRUE;
846                    break;
847
848                default:
849                    assert( 0 && "unhandled value" );
850                    break;
851            }
852        }
853    }
854
855    /* cleanup */
856    tr_free( webseeds );
857    tr_free( peers );
858
859    torrentUnlock( t );
860}
861
862static void
863broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
864{
865    size_t i;
866    size_t peerCount;
867    tr_peer ** peers;
868
869    assert( torrentIsLocked( t ) );
870
871    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
872
873    peerCount = tr_ptrArraySize( &t->peers );
874    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
875    for( i=0; i<peerCount; ++i )
876        if( peers[i]->msgs )
877            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
878}
879
880static void
881addStrike( Torrent * t, tr_peer * peer )
882{
883    tordbg( t, "increasing peer %s strike count to %d",
884            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
885
886    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
887    {
888        struct peer_atom * atom = peer->atom;
889        atom->myflags |= MYFLAG_BANNED;
890        peer->doPurge = 1;
891        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
892    }
893}
894
895static void
896gotBadPiece( Torrent *        t,
897             tr_piece_index_t pieceIndex )
898{
899    tr_torrent *   tor = t->tor;
900    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
901
902    tor->corruptCur += byteCount;
903    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
904}
905
906static void
907refillSoon( Torrent * t )
908{
909    if( !evtimer_pending( &t->refillTimer, NULL ) )
910        tr_timerAdd( &t->refillTimer, 0, REFILL_PERIOD_MSEC );
911}
912
913static void
914peerSuggestedPiece( Torrent            * t UNUSED,
915                    tr_peer            * peer UNUSED,
916                    tr_piece_index_t     pieceIndex UNUSED,
917                    int                  isFastAllowed UNUSED )
918{
919#if 0
920    assert( t );
921    assert( peer );
922    assert( peer->msgs );
923
924    /* is this a valid piece? */
925    if(  pieceIndex >= t->tor->info.pieceCount )
926        return;
927
928    /* don't ask for it if we've already got it */
929    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
930        return;
931
932    /* don't ask for it if they don't have it */
933    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
934        return;
935
936    /* don't ask for it if we're choked and it's not fast */
937    if( !isFastAllowed && peer->clientIsChoked )
938        return;
939
940    /* request the blocks that we don't have in this piece */
941    {
942        tr_block_index_t block;
943        const tr_torrent * tor = t->tor;
944        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
945        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
946
947        for( block=start; block<end; ++block )
948        {
949            if( !tr_cpBlockIsComplete( tor->completion, block ) )
950            {
951                const uint32_t offset = getBlockOffsetInPiece( tor, block );
952                const uint32_t length = tr_torBlockCountBytes( tor, block );
953                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
954                incrementPieceRequests( t, pieceIndex );
955            }
956        }
957    }
958#endif
959}
960
961static void
962peerCallbackFunc( void * vpeer, void * vevent, void * vt )
963{
964    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
965    Torrent * t = vt;
966    const tr_peer_event * e = vevent;
967
968    torrentLock( t );
969
970    switch( e->eventType )
971    {
972        case TR_PEER_UPLOAD_ONLY:
973            /* update our atom */
974            if( peer ) {
975                if( e->uploadOnly ) {
976                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
977                    peer->atom->flags |= ADDED_F_SEED_FLAG;
978                } else {
979                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
980                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
981                }
982            }
983            break;
984
985        case TR_PEER_NEED_REQ:
986            refillSoon( t );
987            break;
988
989        case TR_PEER_CANCEL:
990            blockIteratorRemoveBlock( t->refillQueue, _tr_block( t->tor, e->pieceIndex, e->offset ) );
991            break;
992
993        case TR_PEER_PEER_GOT_DATA:
994        {
995            const time_t now = time( NULL );
996            tr_torrent * tor = t->tor;
997
998            tr_torrentSetActivityDate( tor, now );
999
1000            if( e->wasPieceData ) {
1001                tor->uploadedCur += e->length;
1002                tr_torrentSetDirty( tor );
1003            }
1004
1005            /* update the stats */
1006            if( e->wasPieceData )
1007                tr_statsAddUploaded( tor->session, e->length );
1008
1009            /* update our atom */
1010            if( peer && e->wasPieceData )
1011                peer->atom->piece_data_time = now;
1012
1013            tor->needsSeedRatioCheck = TRUE;
1014
1015            break;
1016        }
1017
1018        case TR_PEER_CLIENT_GOT_PORT:
1019            if( peer )
1020                peer->atom->port = e->port;
1021            break;
1022
1023        case TR_PEER_CLIENT_GOT_SUGGEST:
1024            if( peer )
1025                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1026            break;
1027
1028        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1029            if( peer )
1030                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1031            break;
1032
1033        case TR_PEER_CLIENT_GOT_DATA:
1034        {
1035            const time_t now = time( NULL );
1036            tr_torrent * tor = t->tor;
1037
1038            tr_torrentSetActivityDate( tor, now );
1039
1040            /* only add this to downloadedCur if we got it from a peer --
1041             * webseeds shouldn't count against our ratio.  As one tracker
1042             * admin put it, "Those pieces are downloaded directly from the
1043             * content distributor, not the peers, it is the tracker's job
1044             * to manage the swarms, not the web server and does not fit
1045             * into the jurisdiction of the tracker." */
1046            if( peer && e->wasPieceData ) {
1047                tor->downloadedCur += e->length;
1048                tr_torrentSetDirty( tor );
1049            }
1050
1051            /* update the stats */
1052            if( e->wasPieceData )
1053                tr_statsAddDownloaded( tor->session, e->length );
1054
1055            /* update our atom */
1056            if( peer && e->wasPieceData )
1057                peer->atom->piece_data_time = now;
1058
1059            break;
1060        }
1061
1062        case TR_PEER_PEER_PROGRESS:
1063        {
1064            if( peer )
1065            {
1066                struct peer_atom * atom = peer->atom;
1067                if( e->progress >= 1.0 ) {
1068                    tordbg( t, "marking peer %s as a seed",
1069                            tr_atomAddrStr( atom ) );
1070                    atom->flags |= ADDED_F_SEED_FLAG;
1071                }
1072            }
1073            break;
1074        }
1075
1076        case TR_PEER_CLIENT_GOT_BLOCK:
1077        {
1078            tr_torrent * tor = t->tor;
1079
1080            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1081
1082            tr_cpBlockAdd( &tor->completion, block );
1083            tr_torrentSetDirty( tor );
1084            blockIteratorRemoveBlock( t->refillQueue, block );
1085
1086            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1087
1088            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1089            {
1090                const tr_piece_index_t p = e->pieceIndex;
1091                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1092
1093                if( !ok )
1094                {
1095                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1096                               (unsigned long)p );
1097                }
1098
1099                tr_torrentSetHasPiece( tor, p, ok );
1100                tr_torrentSetPieceChecked( tor, p, TRUE );
1101                tr_peerMgrSetBlame( tor, p, ok );
1102
1103                if( !ok )
1104                {
1105                    gotBadPiece( t, p );
1106                }
1107                else
1108                {
1109                    int i;
1110                    int peerCount;
1111                    tr_peer ** peers;
1112                    tr_file_index_t fileIndex;
1113
1114                    peerCount = tr_ptrArraySize( &t->peers );
1115                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1116                    for( i=0; i<peerCount; ++i )
1117                        tr_peerMsgsHave( peers[i]->msgs, p );
1118
1119                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1120                        const tr_file * file = &tor->info.files[fileIndex];
1121                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1122                            if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1123                                tr_torrentFileCompleted( tor, fileIndex );
1124                    }
1125                }
1126            }
1127            break;
1128        }
1129
1130        case TR_PEER_ERROR:
1131            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1132            {
1133                /* some protocol error from the peer */
1134                peer->doPurge = 1;
1135                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1136                        tr_atomAddrStr( peer->atom ) );
1137            }
1138            else 
1139            {
1140                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1141            }
1142            break;
1143
1144        default:
1145            assert( 0 );
1146    }
1147
1148    torrentUnlock( t );
1149}
1150
1151static void
1152ensureAtomExists( Torrent          * t,
1153                  const tr_address * addr,
1154                  tr_port            port,
1155                  uint8_t            flags,
1156                  uint8_t            from )
1157{
1158    assert( tr_isAddress( addr ) );
1159    assert( from < TR_PEER_FROM__MAX );
1160
1161    if( getExistingAtom( t, addr ) == NULL )
1162    {
1163        struct peer_atom * a;
1164        a = tr_new0( struct peer_atom, 1 );
1165        a->addr = *addr;
1166        a->port = port;
1167        a->flags = flags;
1168        a->from = from;
1169        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1170        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1171    }
1172}
1173
1174static int
1175getMaxPeerCount( const tr_torrent * tor )
1176{
1177    return tor->maxConnectedPeers;
1178}
1179
1180static int
1181getPeerCount( const Torrent * t )
1182{
1183    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1184}
1185
1186/* FIXME: this is kind of a mess. */
1187static tr_bool
1188myHandshakeDoneCB( tr_handshake  * handshake,
1189                   tr_peerIo     * io,
1190                   tr_bool         isConnected,
1191                   const uint8_t * peer_id,
1192                   void          * vmanager )
1193{
1194    tr_bool            ok = isConnected;
1195    tr_bool            success = FALSE;
1196    tr_port            port;
1197    const tr_address * addr;
1198    tr_peerMgr       * manager = vmanager;
1199    Torrent          * t;
1200    tr_handshake     * ours;
1201
1202    assert( io );
1203    assert( tr_isBool( ok ) );
1204
1205    t = tr_peerIoHasTorrentHash( io )
1206        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1207        : NULL;
1208
1209    if( tr_peerIoIsIncoming ( io ) )
1210        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1211                                        handshake, handshakeCompare );
1212    else if( t )
1213        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1214                                        handshake, handshakeCompare );
1215    else
1216        ours = handshake;
1217
1218    assert( ours );
1219    assert( ours == handshake );
1220
1221    if( t )
1222        torrentLock( t );
1223
1224    addr = tr_peerIoGetAddress( io, &port );
1225
1226    if( !ok || !t || !t->isRunning )
1227    {
1228        if( t )
1229        {
1230            struct peer_atom * atom = getExistingAtom( t, addr );
1231            if( atom )
1232                ++atom->numFails;
1233        }
1234    }
1235    else /* looking good */
1236    {
1237        struct peer_atom * atom;
1238        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1239        atom = getExistingAtom( t, addr );
1240        atom->time = time( NULL );
1241        atom->piece_data_time = 0;
1242
1243        if( atom->myflags & MYFLAG_BANNED )
1244        {
1245            tordbg( t, "banned peer %s tried to reconnect",
1246                    tr_atomAddrStr( atom ) );
1247        }
1248        else if( tr_peerIoIsIncoming( io )
1249               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1250
1251        {
1252        }
1253        else
1254        {
1255            tr_peer * peer = getExistingPeer( t, addr );
1256
1257            if( peer ) /* we already have this peer */
1258            {
1259            }
1260            else
1261            {
1262                peer = getPeer( t, atom );
1263                tr_free( peer->client );
1264
1265                if( !peer_id )
1266                    peer->client = NULL;
1267                else {
1268                    char client[128];
1269                    tr_clientForId( client, sizeof( client ), peer_id );
1270                    peer->client = tr_strdup( client );
1271                }
1272
1273                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1274                                                                balanced by our unref in peerDestructor()  */
1275                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1276                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1277
1278                success = TRUE;
1279            }
1280        }
1281    }
1282
1283    if( t )
1284        torrentUnlock( t );
1285
1286    return success;
1287}
1288
1289void
1290tr_peerMgrAddIncoming( tr_peerMgr * manager,
1291                       tr_address * addr,
1292                       tr_port      port,
1293                       int          socket )
1294{
1295    tr_session * session;
1296
1297    managerLock( manager );
1298
1299    assert( tr_isSession( manager->session ) );
1300    session = manager->session;
1301
1302    if( tr_sessionIsAddressBlocked( session, addr ) )
1303    {
1304        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1305        tr_netClose( session, socket );
1306    }
1307    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1308    {
1309        tr_netClose( session, socket );
1310    }
1311    else /* we don't have a connetion to them yet... */
1312    {
1313        tr_peerIo *    io;
1314        tr_handshake * handshake;
1315
1316        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1317
1318        handshake = tr_handshakeNew( io,
1319                                     session->encryptionMode,
1320                                     myHandshakeDoneCB,
1321                                     manager );
1322
1323        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1324
1325        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1326                                 handshakeCompare );
1327    }
1328
1329    managerUnlock( manager );
1330}
1331
1332static tr_bool
1333tr_isPex( const tr_pex * pex )
1334{
1335    return pex && tr_isAddress( &pex->addr );
1336}
1337
1338void
1339tr_peerMgrAddPex( tr_torrent   *  tor,
1340                  uint8_t         from,
1341                  const tr_pex *  pex )
1342{
1343    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1344    {
1345        Torrent * t = tor->torrentPeers;
1346        managerLock( t->manager );
1347
1348        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1349            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1350                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1351
1352        managerUnlock( t->manager );
1353    }
1354}
1355
1356tr_pex *
1357tr_peerMgrCompactToPex( const void *    compact,
1358                        size_t          compactLen,
1359                        const uint8_t * added_f,
1360                        size_t          added_f_len,
1361                        size_t *        pexCount )
1362{
1363    size_t          i;
1364    size_t          n = compactLen / 6;
1365    const uint8_t * walk = compact;
1366    tr_pex *        pex = tr_new0( tr_pex, n );
1367
1368    for( i = 0; i < n; ++i )
1369    {
1370        pex[i].addr.type = TR_AF_INET;
1371        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1372        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1373        if( added_f && ( n == added_f_len ) )
1374            pex[i].flags = added_f[i];
1375    }
1376
1377    *pexCount = n;
1378    return pex;
1379}
1380
1381tr_pex *
1382tr_peerMgrCompact6ToPex( const void    * compact,
1383                         size_t          compactLen,
1384                         const uint8_t * added_f,
1385                         size_t          added_f_len,
1386                         size_t        * pexCount )
1387{
1388    size_t          i;
1389    size_t          n = compactLen / 18;
1390    const uint8_t * walk = compact;
1391    tr_pex *        pex = tr_new0( tr_pex, n );
1392
1393    for( i = 0; i < n; ++i )
1394    {
1395        pex[i].addr.type = TR_AF_INET6;
1396        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1397        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1398        if( added_f && ( n == added_f_len ) )
1399            pex[i].flags = added_f[i];
1400    }
1401
1402    *pexCount = n;
1403    return pex;
1404}
1405
1406tr_pex *
1407tr_peerMgrArrayToPex( const void * array,
1408                      size_t       arrayLen,
1409                      size_t      * pexCount )
1410{
1411    size_t          i;
1412    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1413    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1414    const uint8_t * walk = array;
1415    tr_pex        * pex = tr_new0( tr_pex, n );
1416
1417    for( i = 0 ; i < n ; i++ ) {
1418        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1419        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1420        pex[i].flags = 0x00;
1421        walk += sizeof( tr_address ) + 2;
1422    }
1423
1424    *pexCount = n;
1425    return pex;
1426}
1427
1428/**
1429***
1430**/
1431
1432void
1433tr_peerMgrSetBlame( tr_torrent     * tor,
1434                    tr_piece_index_t pieceIndex,
1435                    int              success )
1436{
1437    if( !success )
1438    {
1439        int        peerCount, i;
1440        Torrent *  t = tor->torrentPeers;
1441        tr_peer ** peers;
1442
1443        assert( torrentIsLocked( t ) );
1444
1445        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1446        for( i = 0; i < peerCount; ++i )
1447        {
1448            tr_peer * peer = peers[i];
1449            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1450            {
1451                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1452                        tr_atomAddrStr( peer->atom ),
1453                        pieceIndex, (int)peer->strikes + 1 );
1454                addStrike( t, peer );
1455            }
1456        }
1457    }
1458}
1459
1460int
1461tr_pexCompare( const void * va, const void * vb )
1462{
1463    const tr_pex * a = va;
1464    const tr_pex * b = vb;
1465    int i;
1466
1467    assert( tr_isPex( a ) );
1468    assert( tr_isPex( b ) );
1469
1470    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1471        return i;
1472
1473    if( a->port != b->port )
1474        return a->port < b->port ? -1 : 1;
1475
1476    return 0;
1477}
1478
1479#if 0
1480static int
1481peerPrefersCrypto( const tr_peer * peer )
1482{
1483    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1484        return TRUE;
1485
1486    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1487        return FALSE;
1488
1489    return tr_peerIoIsEncrypted( peer->io );
1490}
1491#endif
1492
1493/* better goes first */
1494static int
1495compareAtomsByUsefulness( const void * va, const void *vb )
1496{
1497    const struct peer_atom * a = * (const struct peer_atom**) va;
1498    const struct peer_atom * b = * (const struct peer_atom**) vb;
1499
1500    assert( tr_isAtom( a ) );
1501    assert( tr_isAtom( b ) );
1502
1503    if( a->piece_data_time != b->piece_data_time )
1504        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1505    if( a->from != b->from )
1506        return a->from < b->from ? -1 : 1;
1507    if( a->numFails != b->numFails )
1508        return a->numFails < b->numFails ? -1 : 1;
1509
1510    return 0;
1511}
1512
1513int
1514tr_peerMgrGetPeers( tr_torrent * tor, tr_pex ** setme_pex, uint8_t af, int maxPeerCount )
1515{
1516    int count = 0;
1517    const Torrent * t = tor->torrentPeers;
1518
1519    managerLock( t->manager );
1520
1521    {
1522        int i;
1523        const int atomCount = tr_ptrArraySize( &t->pool );
1524        const int pexCount = MIN( atomCount, maxPeerCount );
1525        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1526        struct peer_atom ** atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1527        /* for now, this will waste memory on torrents that have both
1528         * ipv6 and ipv4 peers */
1529        tr_pex * pex = tr_new0( tr_pex, atomCount );
1530        tr_pex * walk = pex;
1531
1532        qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1533
1534        for( i=0; i<atomCount && count<pexCount; ++i )
1535        {
1536            const struct peer_atom * atom = atoms[i];
1537            if( atom->addr.type == af )
1538            {
1539                assert( tr_isAddress( &atom->addr ) );
1540                walk->addr = atom->addr;
1541                walk->port = atom->port;
1542                walk->flags = atom->flags;
1543                ++count;
1544                ++walk;
1545            }
1546        }
1547
1548        assert( ( walk - pex ) == count );
1549        *setme_pex = pex;
1550
1551        tr_free( atoms );
1552    }
1553
1554    managerUnlock( t->manager );
1555    return count;
1556}
1557
1558static void
1559ensureMgrTimersExist( struct tr_peerMgr * m )
1560{
1561    tr_session * s = m->session;
1562
1563    if( m->bandwidthTimer == NULL )
1564        m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
1565
1566    if( m->rechokeTimer == NULL )
1567        m->rechokeTimer = tr_timerNew( s, rechokePulse, m, RECHOKE_PERIOD_MSEC );
1568
1569    if( m->reconnectTimer == NULL )
1570        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
1571}
1572
1573void
1574tr_peerMgrStartTorrent( tr_torrent * tor )
1575{
1576    Torrent * t = tor->torrentPeers;
1577
1578    assert( t != NULL );
1579    managerLock( t->manager );
1580    ensureMgrTimersExist( t->manager );
1581
1582    if( !t->isRunning )
1583    {
1584        t->isRunning = TRUE;
1585
1586        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1587            refillSoon( t );
1588    }
1589
1590    rechokePulse( t->manager );
1591    managerUnlock( t->manager );
1592}
1593
1594static void
1595stopTorrent( Torrent * t )
1596{
1597    assert( torrentIsLocked( t ) );
1598
1599    t->isRunning = FALSE;
1600
1601    /* disconnect the peers. */
1602    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1603    tr_ptrArrayClear( &t->peers );
1604
1605    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1606     * which removes the handshake from t->outgoingHandshakes... */
1607    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1608        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1609}
1610
1611void
1612tr_peerMgrStopTorrent( tr_torrent * tor )
1613{
1614    Torrent * t = tor->torrentPeers;
1615
1616    managerLock( t->manager );
1617
1618    stopTorrent( t );
1619
1620    managerUnlock( t->manager );
1621}
1622
1623void
1624tr_peerMgrAddTorrent( tr_peerMgr * manager,
1625                      tr_torrent * tor )
1626{
1627    managerLock( manager );
1628
1629    assert( tor );
1630    assert( tor->torrentPeers == NULL );
1631
1632    tor->torrentPeers = torrentConstructor( manager, tor );
1633
1634    managerUnlock( manager );
1635}
1636
1637void
1638tr_peerMgrRemoveTorrent( tr_torrent * tor )
1639{
1640    tr_torrentLock( tor );
1641
1642    stopTorrent( tor->torrentPeers );
1643    torrentDestructor( tor->torrentPeers );
1644
1645    tr_torrentUnlock( tor );
1646}
1647
1648void
1649tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1650                               int8_t           * tab,
1651                               unsigned int       tabCount )
1652{
1653    tr_piece_index_t   i;
1654    const Torrent *    t;
1655    float              interval;
1656    tr_bool            isSeed;
1657    int                peerCount;
1658    const tr_peer **   peers;
1659    tr_torrentLock( tor );
1660
1661    t = tor->torrentPeers;
1662    tor = t->tor;
1663    interval = tor->info.pieceCount / (float)tabCount;
1664    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1665    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1666    peerCount = tr_ptrArraySize( &t->peers );
1667
1668    memset( tab, 0, tabCount );
1669
1670    for( i = 0; tor && i < tabCount; ++i )
1671    {
1672        const int piece = i * interval;
1673
1674        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1675            tab[i] = -1;
1676        else if( peerCount ) {
1677            int j;
1678            for( j = 0; j < peerCount; ++j )
1679                if( tr_bitfieldHas( peers[j]->have, i ) )
1680                    ++tab[i];
1681        }
1682    }
1683
1684    tr_torrentUnlock( tor );
1685}
1686
1687/* Returns the pieces that are available from peers */
1688tr_bitfield*
1689tr_peerMgrGetAvailable( const tr_torrent * tor )
1690{
1691    int i;
1692    int peerCount;
1693    Torrent * t = tor->torrentPeers;
1694    const tr_peer ** peers;
1695    tr_bitfield * pieces;
1696    managerLock( t->manager );
1697
1698    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1699    peerCount = tr_ptrArraySize( &t->peers );
1700    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1701    for( i=0; i<peerCount; ++i )
1702        tr_bitfieldOr( pieces, peers[i]->have );
1703
1704    managerUnlock( t->manager );
1705    return pieces;
1706}
1707
1708void
1709tr_peerMgrTorrentStats( tr_torrent       * tor,
1710                        int              * setmePeersKnown,
1711                        int              * setmePeersConnected,
1712                        int              * setmeSeedsConnected,
1713                        int              * setmeWebseedsSendingToUs,
1714                        int              * setmePeersSendingToUs,
1715                        int              * setmePeersGettingFromUs,
1716                        int              * setmePeersFrom )
1717{
1718    int i, size;
1719    const Torrent * t = tor->torrentPeers;
1720    const tr_peer ** peers;
1721    const tr_webseed ** webseeds;
1722
1723    managerLock( t->manager );
1724
1725    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1726    size = tr_ptrArraySize( &t->peers );
1727
1728    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1729    *setmePeersConnected       = 0;
1730    *setmeSeedsConnected       = 0;
1731    *setmePeersGettingFromUs   = 0;
1732    *setmePeersSendingToUs     = 0;
1733    *setmeWebseedsSendingToUs  = 0;
1734
1735    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1736        setmePeersFrom[i] = 0;
1737
1738    for( i=0; i<size; ++i )
1739    {
1740        const tr_peer * peer = peers[i];
1741        const struct peer_atom * atom = peer->atom;
1742
1743        if( peer->io == NULL ) /* not connected */
1744            continue;
1745
1746        ++*setmePeersConnected;
1747
1748        ++setmePeersFrom[atom->from];
1749
1750        if( clientIsDownloadingFrom( peer ) )
1751            ++*setmePeersSendingToUs;
1752
1753        if( clientIsUploadingTo( peer ) )
1754            ++*setmePeersGettingFromUs;
1755
1756        if( atom->flags & ADDED_F_SEED_FLAG )
1757            ++*setmeSeedsConnected;
1758    }
1759
1760    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1761    size = tr_ptrArraySize( &t->webseeds );
1762    for( i=0; i<size; ++i )
1763        if( tr_webseedIsActive( webseeds[i] ) )
1764            ++*setmeWebseedsSendingToUs;
1765
1766    managerUnlock( t->manager );
1767}
1768
1769float
1770tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
1771{
1772    int i;
1773    float tmp;
1774    float ret = 0;
1775
1776    const Torrent * t = tor->torrentPeers;
1777    const int n = tr_ptrArraySize( &t->webseeds );
1778    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1779
1780    for( i=0; i<n; ++i )
1781        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
1782            ret += tmp;
1783
1784    return ret;
1785}
1786
1787
1788float*
1789tr_peerMgrWebSpeeds( const tr_torrent * tor )
1790{
1791    const Torrent * t = tor->torrentPeers;
1792    const tr_webseed ** webseeds;
1793    int i;
1794    int webseedCount;
1795    float * ret;
1796    uint64_t now;
1797
1798    assert( t->manager );
1799    managerLock( t->manager );
1800
1801    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1802    webseedCount = tr_ptrArraySize( &t->webseeds );
1803    assert( webseedCount == tor->info.webseedCount );
1804    ret = tr_new0( float, webseedCount );
1805    now = tr_date( );
1806
1807    for( i=0; i<webseedCount; ++i )
1808        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1809            ret[i] = -1.0;
1810
1811    managerUnlock( t->manager );
1812    return ret;
1813}
1814
1815double
1816tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1817{
1818    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1819}
1820
1821
1822struct tr_peer_stat *
1823tr_peerMgrPeerStats( const tr_torrent    * tor,
1824                     int                 * setmeCount )
1825{
1826    int i, size;
1827    const Torrent * t = tor->torrentPeers;
1828    const tr_peer ** peers;
1829    tr_peer_stat * ret;
1830    uint64_t now;
1831
1832    assert( t->manager );
1833    managerLock( t->manager );
1834
1835    size = tr_ptrArraySize( &t->peers );
1836    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1837    ret = tr_new0( tr_peer_stat, size );
1838    now = tr_date( );
1839
1840    for( i=0; i<size; ++i )
1841    {
1842        char *                   pch;
1843        const tr_peer *          peer = peers[i];
1844        const struct peer_atom * atom = peer->atom;
1845        tr_peer_stat *           stat = ret + i;
1846
1847        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
1848        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1849                   sizeof( stat->client ) );
1850        stat->port               = ntohs( peer->atom->port );
1851        stat->from               = atom->from;
1852        stat->progress           = peer->progress;
1853        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1854        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1855        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1856        stat->peerIsChoked       = peer->peerIsChoked;
1857        stat->peerIsInterested   = peer->peerIsInterested;
1858        stat->clientIsChoked     = peer->clientIsChoked;
1859        stat->clientIsInterested = peer->clientIsInterested;
1860        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1861        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1862        stat->isUploadingTo      = clientIsUploadingTo( peer );
1863        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1864
1865        pch = stat->flagStr;
1866        if( t->optimistic == peer ) *pch++ = 'O';
1867        if( stat->isDownloadingFrom ) *pch++ = 'D';
1868        else if( stat->clientIsInterested ) *pch++ = 'd';
1869        if( stat->isUploadingTo ) *pch++ = 'U';
1870        else if( stat->peerIsInterested ) *pch++ = 'u';
1871        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1872        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1873        if( stat->isEncrypted ) *pch++ = 'E';
1874        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
1875        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1876        if( stat->isIncoming ) *pch++ = 'I';
1877        *pch = '\0';
1878    }
1879
1880    *setmeCount = size;
1881
1882    managerUnlock( t->manager );
1883    return ret;
1884}
1885
1886/**
1887***
1888**/
1889
1890struct ChokeData
1891{
1892    tr_bool         doUnchoke;
1893    tr_bool         isInterested;
1894    tr_bool         isChoked;
1895    int             rate;
1896    tr_peer *       peer;
1897};
1898
1899static int
1900compareChoke( const void * va,
1901              const void * vb )
1902{
1903    const struct ChokeData * a = va;
1904    const struct ChokeData * b = vb;
1905
1906    if( a->rate != b->rate ) /* prefer higher overall speeds */
1907        return a->rate > b->rate ? -1 : 1;
1908
1909    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1910        return a->isChoked ? 1 : -1;
1911
1912    return 0;
1913}
1914
1915static int
1916isNew( const tr_peer * peer )
1917{
1918    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1919}
1920
1921static int
1922isSame( const tr_peer * peer )
1923{
1924    return peer && peer->client && strstr( peer->client, "Transmission" );
1925}
1926
1927/**
1928***
1929**/
1930
1931static void
1932rechokeTorrent( Torrent * t, const uint64_t now )
1933{
1934    int i, size, unchokedInterested;
1935    const int peerCount = tr_ptrArraySize( &t->peers );
1936    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1937    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1938    const tr_session * session = t->manager->session;
1939    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1940
1941    assert( torrentIsLocked( t ) );
1942
1943    /* sort the peers by preference and rate */
1944    for( i = 0, size = 0; i < peerCount; ++i )
1945    {
1946        tr_peer * peer = peers[i];
1947        struct peer_atom * atom = peer->atom;
1948
1949        if( peer->progress >= 1.0 ) /* choke all seeds */
1950        {
1951            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1952        }
1953        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1954        {
1955            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1956        }
1957        else if( chokeAll ) /* choke everyone if we're not uploading */
1958        {
1959            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1960        }
1961        else
1962        {
1963            struct ChokeData * n = &choke[size++];
1964            n->peer         = peer;
1965            n->isInterested = peer->peerIsInterested;
1966            n->isChoked     = peer->peerIsChoked;
1967            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1968        }
1969    }
1970
1971    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1972
1973    /**
1974     * Reciprocation and number of uploads capping is managed by unchoking
1975     * the N peers which have the best upload rate and are interested.
1976     * This maximizes the client's download rate. These N peers are
1977     * referred to as downloaders, because they are interested in downloading
1978     * from the client.
1979     *
1980     * Peers which have a better upload rate (as compared to the downloaders)
1981     * but aren't interested get unchoked. If they become interested, the
1982     * downloader with the worst upload rate gets choked. If a client has
1983     * a complete file, it uses its upload rate rather than its download
1984     * rate to decide which peers to unchoke.
1985     */
1986    unchokedInterested = 0;
1987    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
1988        choke[i].doUnchoke = 1;
1989        if( choke[i].isInterested )
1990            ++unchokedInterested;
1991    }
1992
1993    /* optimistic unchoke */
1994    if( i < size )
1995    {
1996        int n;
1997        struct ChokeData * c;
1998        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1999
2000        for( ; i<size; ++i )
2001        {
2002            if( choke[i].isInterested )
2003            {
2004                const tr_peer * peer = choke[i].peer;
2005                int x = 1, y;
2006                if( isNew( peer ) ) x *= 3;
2007                if( isSame( peer ) ) x *= 3;
2008                for( y=0; y<x; ++y )
2009                    tr_ptrArrayAppend( &randPool, &choke[i] );
2010            }
2011        }
2012
2013        if(( n = tr_ptrArraySize( &randPool )))
2014        {
2015            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2016            c->doUnchoke = 1;
2017            t->optimistic = c->peer;
2018        }
2019
2020        tr_ptrArrayDestruct( &randPool, NULL );
2021    }
2022
2023    for( i=0; i<size; ++i )
2024        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2025
2026    /* cleanup */
2027    tr_free( choke );
2028}
2029
2030static int
2031rechokePulse( void * vmgr )
2032{
2033    uint64_t now;
2034    tr_torrent * tor = NULL;
2035    tr_peerMgr * mgr = vmgr;
2036    managerLock( mgr );
2037
2038    now = tr_date( );
2039    while(( tor = tr_torrentNext( mgr->session, tor )))
2040        if( tor->isRunning )
2041            rechokeTorrent( tor->torrentPeers, now );
2042
2043    managerUnlock( mgr );
2044    return TRUE;
2045}
2046
2047/***
2048****
2049****  Life and Death
2050****
2051***/
2052
2053typedef enum
2054{
2055    TR_CAN_KEEP,
2056    TR_CAN_CLOSE,
2057    TR_MUST_CLOSE,
2058}
2059tr_close_type_t;
2060
2061static tr_close_type_t
2062shouldPeerBeClosed( const Torrent    * t,
2063                    const tr_peer    * peer,
2064                    int                peerCount )
2065{
2066    const tr_torrent *       tor = t->tor;
2067    const time_t             now = time( NULL );
2068    const struct peer_atom * atom = peer->atom;
2069
2070    /* if it's marked for purging, close it */
2071    if( peer->doPurge )
2072    {
2073        tordbg( t, "purging peer %s because its doPurge flag is set",
2074                tr_atomAddrStr( atom ) );
2075        return TR_MUST_CLOSE;
2076    }
2077
2078    /* if we're seeding and the peer has everything we have,
2079     * and enough time has passed for a pex exchange, then disconnect */
2080    if( tr_torrentIsSeed( tor ) )
2081    {
2082        int peerHasEverything;
2083        if( atom->flags & ADDED_F_SEED_FLAG )
2084            peerHasEverything = TRUE;
2085        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2086            peerHasEverything = FALSE;
2087        else {
2088            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2089            tr_bitfieldDifference( tmp, peer->have );
2090            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2091            tr_bitfieldFree( tmp );
2092        }
2093
2094        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2095        {
2096            tordbg( t, "purging peer %s because we're both seeds",
2097                    tr_atomAddrStr( atom ) );
2098            return TR_MUST_CLOSE;
2099        }
2100    }
2101
2102    /* disconnect if it's been too long since piece data has been transferred.
2103     * this is on a sliding scale based on number of available peers... */
2104    {
2105        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2106        /* if we have >= relaxIfFewerThan, strictness is 100%.
2107         * if we have zero connections, strictness is 0% */
2108        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2109                               ? 1.0
2110                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2111        const int lo = MIN_UPLOAD_IDLE_SECS;
2112        const int hi = MAX_UPLOAD_IDLE_SECS;
2113        const int limit = hi - ( ( hi - lo ) * strictness );
2114        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2115/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2116        if( idleTime > limit ) {
2117            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2118                       tr_atomAddrStr( atom ), idleTime );
2119            return TR_CAN_CLOSE;
2120        }
2121    }
2122
2123    return TR_CAN_KEEP;
2124}
2125
2126static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2127
2128static tr_peer **
2129getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2130{
2131    int i, peerCount, outsize;
2132    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2133    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2134
2135    assert( torrentIsLocked( t ) );
2136
2137    for( i = outsize = 0; i < peerCount; ++i )
2138        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2139            ret[outsize++] = peers[i];
2140
2141    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2142
2143    *setmeSize = outsize;
2144    return ret;
2145}
2146
2147static int
2148compareCandidates( const void * va, const void * vb )
2149{
2150    const struct peer_atom * a = *(const struct peer_atom**) va;
2151    const struct peer_atom * b = *(const struct peer_atom**) vb;
2152
2153    /* <Charles> Here we would probably want to try reconnecting to
2154     * peers that had most recently given us data. Lots of users have
2155     * trouble with resets due to their routers and/or ISPs. This way we
2156     * can quickly recover from an unwanted reset. So we sort
2157     * piece_data_time in descending order.
2158     */
2159
2160    if( a->piece_data_time != b->piece_data_time )
2161        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2162
2163    if( a->numFails != b->numFails )
2164        return a->numFails < b->numFails ? -1 : 1;
2165
2166    if( a->time != b->time )
2167        return a->time < b->time ? -1 : 1;
2168
2169    /* In order to avoid fragmenting the swarm, peers from trackers and
2170     * from the DHT should be preferred to peers from PEX. */
2171    if( a->from != b->from )
2172        return a->from < b->from ? -1 : 1;
2173
2174    return 0;
2175}
2176
2177static int
2178getReconnectIntervalSecs( const struct peer_atom * atom )
2179{
2180    int          sec;
2181    const time_t now = time( NULL );
2182
2183    /* if we were recently connected to this peer and transferring piece
2184     * data, try to reconnect to them sooner rather that later -- we don't
2185     * want network troubles to get in the way of a good peer. */
2186    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2187        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2188
2189    /* don't allow reconnects more often than our minimum */
2190    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2191        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2192
2193    /* otherwise, the interval depends on how many times we've tried
2194     * and failed to connect to the peer */
2195    else switch( atom->numFails ) {
2196        case 0: sec = 0; break;
2197        case 1: sec = 5; break;
2198        case 2: sec = 2 * 60; break;
2199        case 3: sec = 15 * 60; break;
2200        case 4: sec = 30 * 60; break;
2201        case 5: sec = 60 * 60; break;
2202        default: sec = 120 * 60; break;
2203    }
2204
2205    return sec;
2206}
2207
2208static struct peer_atom **
2209getPeerCandidates( Torrent * t, int * setmeSize )
2210{
2211    int                 i, atomCount, retCount;
2212    struct peer_atom ** atoms;
2213    struct peer_atom ** ret;
2214    const time_t        now = time( NULL );
2215    const int           seed = tr_torrentIsSeed( t->tor );
2216
2217    assert( torrentIsLocked( t ) );
2218
2219    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2220    ret = tr_new( struct peer_atom*, atomCount );
2221    for( i = retCount = 0; i < atomCount; ++i )
2222    {
2223        int                interval;
2224        struct peer_atom * atom = atoms[i];
2225
2226        /* peer fed us too much bad data ... we only keep it around
2227         * now to weed it out in case someone sends it to us via pex */
2228        if( atom->myflags & MYFLAG_BANNED )
2229            continue;
2230
2231        /* peer was unconnectable before, so we're not going to keep trying.
2232         * this is needs a separate flag from `banned', since if they try
2233         * to connect to us later, we'll let them in */
2234        if( atom->myflags & MYFLAG_UNREACHABLE )
2235            continue;
2236
2237        /* we don't need two connections to the same peer... */
2238        if( peerIsInUse( t, &atom->addr ) )
2239            continue;
2240
2241        /* no need to connect if we're both seeds... */
2242        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2243                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2244            continue;
2245
2246        /* don't reconnect too often */
2247        interval = getReconnectIntervalSecs( atom );
2248        if( ( now - atom->time ) < interval )
2249        {
2250            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2251                    i, tr_atomAddrStr( atom ), interval );
2252            continue;
2253        }
2254
2255        /* Don't connect to peers in our blocklist */
2256        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2257            continue;
2258
2259        ret[retCount++] = atom;
2260    }
2261
2262    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2263    *setmeSize = retCount;
2264    return ret;
2265}
2266
2267static void
2268closePeer( Torrent * t, tr_peer * peer )
2269{
2270    struct peer_atom * atom;
2271
2272    assert( t != NULL );
2273    assert( peer != NULL );
2274
2275    atom = peer->atom;
2276
2277    /* if we transferred piece data, then they might be good peers,
2278       so reset their `numFails' weight to zero.  otherwise we connected
2279       to them fruitlessly, so mark it as another fail */
2280    if( atom->piece_data_time )
2281        atom->numFails = 0;
2282    else
2283        ++atom->numFails;
2284
2285    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2286    removePeer( t, peer );
2287}
2288
2289static void
2290reconnectTorrent( Torrent * t )
2291{
2292    static time_t prevTime = 0;
2293    static int    newConnectionsThisSecond = 0;
2294    time_t        now;
2295
2296    now = time( NULL );
2297    if( prevTime != now )
2298    {
2299        prevTime = now;
2300        newConnectionsThisSecond = 0;
2301    }
2302
2303    if( !t->isRunning )
2304    {
2305        removeAllPeers( t );
2306    }
2307    else
2308    {
2309        int i;
2310        int canCloseCount;
2311        int mustCloseCount;
2312        int candidateCount;
2313        int maxCandidates;
2314        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2315        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2316        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2317
2318        tordbg( t, "reconnect pulse for [%s]: "
2319                   "%d must-close connections, "
2320                   "%d can-close connections, "
2321                   "%d connection candidates, "
2322                   "%d atoms, "
2323                   "max per pulse is %d",
2324                   tr_torrentName( t->tor ),
2325                   mustCloseCount,
2326                   canCloseCount,
2327                   candidateCount,
2328                   tr_ptrArraySize( &t->pool ),
2329                   MAX_RECONNECTIONS_PER_PULSE );
2330
2331        /* disconnect the really bad peers */
2332        for( i=0; i<mustCloseCount; ++i )
2333            closePeer( t, mustClose[i] );
2334
2335        /* decide how many peers can we try to add in this pass */
2336        maxCandidates = candidateCount;
2337        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2338        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2339        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2340
2341        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2342        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2343            closePeer( t, canClose[i] );
2344
2345        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2346                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2347                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2348                   candidateCount,
2349                   MAX_RECONNECTIONS_PER_PULSE,
2350                   getPeerCount( t ),
2351                   getMaxPeerCount( t->tor ),
2352                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2353
2354        /* add some new ones */
2355        for( i=0; i<maxCandidates; ++i )
2356        {
2357            tr_peerMgr        * mgr = t->manager;
2358            struct peer_atom  * atom = candidates[i];
2359            tr_peerIo         * io;
2360
2361            tordbg( t, "Starting an OUTGOING connection with %s",
2362                   tr_atomAddrStr( atom ) );
2363
2364            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->tor->info.hash );
2365
2366            if( io == NULL )
2367            {
2368                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2369                        tr_atomAddrStr( atom ) );
2370                atom->myflags |= MYFLAG_UNREACHABLE;
2371            }
2372            else
2373            {
2374                tr_handshake * handshake = tr_handshakeNew( io,
2375                                                            mgr->session->encryptionMode,
2376                                                            myHandshakeDoneCB,
2377                                                            mgr );
2378
2379                assert( tr_peerIoGetTorrentHash( io ) );
2380
2381                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2382
2383                ++newConnectionsThisSecond;
2384
2385                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2386                                         handshakeCompare );
2387            }
2388
2389            atom->time = time( NULL );
2390        }
2391
2392        /* cleanup */
2393        tr_free( candidates );
2394        tr_free( mustClose );
2395        tr_free( canClose );
2396    }
2397}
2398
2399struct peer_liveliness
2400{
2401    tr_peer * peer;
2402    void * clientData;
2403    time_t pieceDataTime;
2404    time_t time;
2405    int speed;
2406    tr_bool doPurge;
2407};
2408
2409static int
2410comparePeerLiveliness( const void * va, const void * vb )
2411{
2412    const struct peer_liveliness * a = va;
2413    const struct peer_liveliness * b = vb;
2414
2415    if( a->doPurge != b->doPurge )
2416        return a->doPurge ? 1 : -1;
2417
2418    if( a->speed != b->speed ) /* faster goes first */
2419        return a->speed > b->speed ? -1 : 1;
2420
2421    /* the one to give us data more recently goes first */
2422    if( a->pieceDataTime != b->pieceDataTime )
2423        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2424
2425    /* the one we connected to most recently goes first */
2426    if( a->time != b->time )
2427        return a->time > b->time ? -1 : 1;
2428
2429    return 0;
2430}
2431
2432static int
2433comparePeerLivelinessReverse( const void * va, const void * vb )
2434{
2435    return -comparePeerLiveliness (va, vb);
2436}
2437
2438static void
2439sortPeersByLivelinessImpl( tr_peer  ** peers,
2440                           void     ** clientData,
2441                           int         n,
2442                           uint64_t    now,
2443                           int (*compare) ( const void *va, const void *vb ) )
2444{
2445    int i;
2446    struct peer_liveliness *lives, *l;
2447
2448    /* build a sortable array of peer + extra info */
2449    lives = l = tr_new0( struct peer_liveliness, n );
2450    for( i=0; i<n; ++i, ++l )
2451    {
2452        tr_peer * p = peers[i];
2453        l->peer = p;
2454        l->doPurge = p->doPurge;
2455        l->pieceDataTime = p->atom->piece_data_time;
2456        l->time = p->atom->time;
2457        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2458                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2459        if( clientData )
2460            l->clientData = clientData[i];
2461    }
2462
2463    /* sort 'em */
2464    assert( n == ( l - lives ) );
2465    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2466
2467    /* build the peer array */
2468    for( i=0, l=lives; i<n; ++i, ++l ) {
2469        peers[i] = l->peer;
2470        if( clientData )
2471            clientData[i] = l->clientData;
2472    }
2473    assert( n == ( l - lives ) );
2474
2475    /* cleanup */
2476    tr_free( lives );
2477}
2478
2479static void
2480sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2481{
2482    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2483}
2484
2485static void
2486sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2487{
2488    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2489}
2490
2491
2492static void
2493enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2494{
2495    int n = tr_ptrArraySize( &t->peers );
2496    const int max = tr_torrentGetPeerLimit( t->tor );
2497    if( n > max )
2498    {
2499        void * base = tr_ptrArrayBase( &t->peers );
2500        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2501        sortPeersByLiveliness( peers, NULL, n, now );
2502        while( n > max )
2503            closePeer( t, peers[--n] );
2504        tr_free( peers );
2505    }
2506}
2507
2508static void
2509enforceSessionPeerLimit( tr_session * session, uint64_t now )
2510{
2511    int n = 0;
2512    tr_torrent * tor = NULL;
2513    const int max = tr_sessionGetPeerLimit( session );
2514
2515    /* count the total number of peers */
2516    while(( tor = tr_torrentNext( session, tor )))
2517        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2518
2519    /* if there are too many, prune out the worst */
2520    if( n > max )
2521    {
2522        tr_peer ** peers = tr_new( tr_peer*, n );
2523        Torrent ** torrents = tr_new( Torrent*, n );
2524
2525        /* populate the peer array */
2526        n = 0;
2527        tor = NULL;
2528        while(( tor = tr_torrentNext( session, tor ))) {
2529            int i;
2530            Torrent * t = tor->torrentPeers;
2531            const int tn = tr_ptrArraySize( &t->peers );
2532            for( i=0; i<tn; ++i, ++n ) {
2533                peers[n] = tr_ptrArrayNth( &t->peers, i );
2534                torrents[n] = t;
2535            }
2536        }
2537
2538        /* sort 'em */
2539        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2540
2541        /* cull out the crappiest */
2542        while( n-- > max )
2543            closePeer( torrents[n], peers[n] );
2544
2545        /* cleanup */
2546        tr_free( torrents );
2547        tr_free( peers );
2548    }
2549}
2550
2551
2552static int
2553reconnectPulse( void * vmgr )
2554{
2555    tr_torrent * tor;
2556    tr_peerMgr * mgr = vmgr;
2557    uint64_t now;
2558    managerLock( mgr );
2559
2560    now = tr_date( );
2561
2562    /* if we're over the per-torrent peer limits, cull some peers */
2563    tor = NULL;
2564    while(( tor = tr_torrentNext( mgr->session, tor )))
2565        if( tor->isRunning )
2566            enforceTorrentPeerLimit( tor->torrentPeers, now );
2567
2568    /* if we're over the per-session peer limits, cull some peers */
2569    enforceSessionPeerLimit( mgr->session, now );
2570
2571    tor = NULL;
2572    while(( tor = tr_torrentNext( mgr->session, tor )))
2573        if( tor->isRunning )
2574            reconnectTorrent( tor->torrentPeers );
2575
2576    managerUnlock( mgr );
2577    return TRUE;
2578}
2579
2580/****
2581*****
2582*****  BANDWIDTH ALLOCATION
2583*****
2584****/
2585
2586static void
2587pumpAllPeers( tr_peerMgr * mgr )
2588{
2589    tr_torrent * tor = NULL;
2590
2591    while(( tor = tr_torrentNext( mgr->session, tor )))
2592    {
2593        int j;
2594        Torrent * t = tor->torrentPeers;
2595
2596        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2597        {
2598            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2599            tr_peerMsgsPulse( peer->msgs );
2600        }
2601    }
2602}
2603
2604static int
2605bandwidthPulse( void * vmgr )
2606{
2607    tr_torrent * tor = NULL;
2608    tr_peerMgr * mgr = vmgr;
2609    managerLock( mgr );
2610
2611    /* FIXME: this next line probably isn't necessary... */
2612    pumpAllPeers( mgr );
2613
2614    /* allocate bandwidth to the peers */
2615    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2616    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2617
2618    /* possibly stop torrents that have seeded enough */
2619    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2620        if( tor->needsSeedRatioCheck ) {
2621            tor->needsSeedRatioCheck = FALSE;
2622            tr_torrentCheckSeedRatio( tor );
2623        }
2624    }
2625
2626    /* possibly stop torrents that have an error */
2627    tor = NULL;
2628    while(( tor = tr_torrentNext( mgr->session, tor )))
2629        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
2630            tr_torrentStop( tor );
2631
2632    managerUnlock( mgr );
2633    return TRUE;
2634}
Note: See TracBrowser for help on using the repository browser.