source: trunk/libtransmission/peer-mgr.c @ 8394

Last change on this file since 8394 was 8394, checked in by charles, 13 years ago

(trunk libT) #2035: Transmission causes wakeups by unnecessary polling

  • Property svn:keywords set to Date Rev Author Id
File size: 70.0 KB
Line 
1
2/*
3 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
4 *
5 * This file is licensed by the GPL version 2.  Works owned by the
6 * Transmission project are granted a special exemption to clause 2(b)
7 * so that the bulk of its code can remain under the MIT license.
8 * This exemption does not extend to derived works not owned by
9 * the Transmission project.
10 *
11 * $Id: peer-mgr.c 8394 2009-05-13 20:54:35Z charles $
12 */
13
14#include <assert.h>
15#include <errno.h>
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18#include <limits.h> /* INT_MAX */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "fdlimit.h"
31#include "handshake.h"
32#include "inout.h" /* tr_ioTestPiece */
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "trevent.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to change which peers are choked */
47    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
48
49    /* minimum interval for refilling peers' request lists */
50    REFILL_PERIOD_MSEC = 400,
51   
52    /* how frequently to reallocate bandwidth */
53    BANDWIDTH_PERIOD_MSEC = 500,
54
55    /* how frequently to age out old piece request lists */
56    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
57
58    /* how frequently to decide which peers live and die */
59    RECONNECT_PERIOD_MSEC = 500,
60
61    /* when many peers are available, keep idle ones this long */
62    MIN_UPLOAD_IDLE_SECS = ( 30 ),
63
64    /* when few peers are available, keep idle ones this long */
65    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
66
67    /* max # of peers to ask fer per torrent per reconnect pulse */
68    MAX_RECONNECTIONS_PER_PULSE = 16,
69
70    /* max number of peers to ask for per second overall.
71    * this throttle is to avoid overloading the router */
72    MAX_CONNECTIONS_PER_SECOND = 32,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* amount of time to keep a list of request pieces lying around
78       before it's considered too old and needs to be rebuilt */
79    PIECE_LIST_SHELF_LIFE_SECS = 60,
80
81    /* use for bitwise operations w/peer_atom.myflags */
82    MYFLAG_BANNED = 1,
83
84    /* use for bitwise operations w/peer_atom.myflags */
85    /* unreachable for now... but not banned.
86     * if they try to connect to us it's okay */
87    MYFLAG_UNREACHABLE = 2,
88
89    /* the minimum we'll wait before attempting to reconnect to a peer */
90    MINIMUM_RECONNECT_INTERVAL_SECS = 5
91};
92
93
94/**
95***
96**/
97
98enum
99{
100    UPLOAD_ONLY_UKNOWN,
101    UPLOAD_ONLY_YES,
102    UPLOAD_ONLY_NO
103};
104
105/**
106 * Peer information that should be kept even before we've connected and
107 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
108 * which ones would make good candidates for connecting to, and to watch out
109 * for banned peers.
110 *
111 * @see tr_peer
112 * @see tr_peermsgs
113 */
114struct peer_atom
115{
116    uint8_t     from;
117    uint8_t     flags;       /* these match the added_f flags */
118    uint8_t     myflags;     /* flags that aren't defined in added_f */
119    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
120    tr_port     port;
121    uint16_t    numFails;
122    tr_address  addr;
123    time_t      time;        /* when the peer's connection status last changed */
124    time_t      piece_data_time;
125};
126
127struct tr_blockIterator
128{
129    time_t expirationDate;
130    struct tr_torrent_peers * t;
131    tr_block_index_t blockIndex, blockCount, *blocks;
132    tr_piece_index_t pieceIndex, pieceCount, *pieces;
133};
134
135typedef struct tr_torrent_peers
136{
137    tr_bool                    isRunning;
138
139    uint8_t                    hash[SHA_DIGEST_LENGTH];
140    int                      * pendingRequestCount;
141    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
142    tr_ptrArray                pool; /* struct peer_atom */
143    tr_ptrArray                peers; /* tr_peer */
144    tr_ptrArray                webseeds; /* tr_webseed */
145    tr_timer                 * refillTimer;
146    tr_torrent               * tor;
147    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
148    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
149
150    struct tr_peerMgr        * manager;
151}
152Torrent;
153
154struct tr_peerMgr
155{
156    tr_session      * session;
157    tr_ptrArray       incomingHandshakes; /* tr_handshake */
158    tr_timer        * bandwidthTimer;
159    tr_timer        * rechokeTimer;
160    tr_timer        * reconnectTimer;
161    tr_timer        * refillUpkeepTimer;
162};
163
164#define tordbg( t, ... ) \
165    do { \
166        if( tr_deepLoggingIsActive( ) ) \
167            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
168    } while( 0 )
169
170#define dbgmsg( ... ) \
171    do { \
172        if( tr_deepLoggingIsActive( ) ) \
173            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
174    } while( 0 )
175
176/**
177***
178**/
179
180static TR_INLINE void
181managerLock( const struct tr_peerMgr * manager )
182{
183    tr_globalLock( manager->session );
184}
185
186static TR_INLINE void
187managerUnlock( const struct tr_peerMgr * manager )
188{
189    tr_globalUnlock( manager->session );
190}
191
192static TR_INLINE void
193torrentLock( Torrent * torrent )
194{
195    managerLock( torrent->manager );
196}
197
198static TR_INLINE void
199torrentUnlock( Torrent * torrent )
200{
201    managerUnlock( torrent->manager );
202}
203
204static TR_INLINE int
205torrentIsLocked( const Torrent * t )
206{
207    return tr_globalIsLocked( t->manager->session );
208}
209
210/**
211***
212**/
213
214static int
215handshakeCompareToAddr( const void * va, const void * vb )
216{
217    const tr_handshake * a = va;
218
219    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
220}
221
222static int
223handshakeCompare( const void * a, const void * b )
224{
225    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
226}
227
228static tr_handshake*
229getExistingHandshake( tr_ptrArray      * handshakes,
230                      const tr_address * addr )
231{
232    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
233}
234
235static int
236comparePeerAtomToAddress( const void * va, const void * vb )
237{
238    const struct peer_atom * a = va;
239
240    return tr_compareAddresses( &a->addr, vb );
241}
242
243static int
244comparePeerAtoms( const void * va, const void * vb )
245{
246    const struct peer_atom * b = vb;
247
248    return comparePeerAtomToAddress( va, &b->addr );
249}
250
251/**
252***
253**/
254
255static Torrent*
256getExistingTorrent( tr_peerMgr *    manager,
257                    const uint8_t * hash )
258{
259    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
260
261    return tor == NULL ? NULL : tor->torrentPeers;
262}
263
264static int
265peerCompare( const void * va, const void * vb )
266{
267    const tr_peer * a = va;
268    const tr_peer * b = vb;
269
270    return tr_compareAddresses( &a->addr, &b->addr );
271}
272
273static int
274peerCompareToAddr( const void * va, const void * vb )
275{
276    const tr_peer * a = va;
277
278    return tr_compareAddresses( &a->addr, vb );
279}
280
281static tr_peer*
282getExistingPeer( Torrent          * torrent,
283                 const tr_address * addr )
284{
285    assert( torrentIsLocked( torrent ) );
286    assert( addr );
287
288    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
289}
290
291static struct peer_atom*
292getExistingAtom( const Torrent    * t,
293                 const tr_address * addr )
294{
295    Torrent * tt = (Torrent*)t;
296    assert( torrentIsLocked( t ) );
297    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
298}
299
300static tr_bool
301peerIsInUse( const Torrent    * ct,
302             const tr_address * addr )
303{
304    Torrent * t = (Torrent*) ct;
305
306    assert( torrentIsLocked ( t ) );
307
308    return getExistingPeer( t, addr )
309        || getExistingHandshake( &t->outgoingHandshakes, addr )
310        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
311}
312
313static tr_peer*
314peerConstructor( const tr_address * addr )
315{
316    tr_peer * p;
317    p = tr_new0( tr_peer, 1 );
318    p->addr = *addr;
319    return p;
320}
321
322static tr_peer*
323getPeer( Torrent          * torrent,
324         const tr_address * addr )
325{
326    tr_peer * peer;
327
328    assert( torrentIsLocked( torrent ) );
329
330    peer = getExistingPeer( torrent, addr );
331
332    if( peer == NULL )
333    {
334        peer = peerConstructor( addr );
335        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
336    }
337
338    return peer;
339}
340
341static void
342peerDestructor( tr_peer * peer )
343{
344    assert( peer );
345
346    if( peer->msgs != NULL )
347    {
348        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
349        tr_peerMsgsFree( peer->msgs );
350    }
351
352    tr_peerIoClear( peer->io );
353    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
354
355    tr_bitfieldFree( peer->have );
356    tr_bitfieldFree( peer->blame );
357    tr_free( peer->client );
358
359    tr_free( peer );
360}
361
362static void
363removePeer( Torrent * t, tr_peer * peer )
364{
365    tr_peer * removed;
366    struct peer_atom * atom = peer->atom;
367
368    assert( torrentIsLocked( t ) );
369    assert( atom );
370
371    atom->time = time( NULL );
372
373    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
374    assert( removed == peer );
375    peerDestructor( removed );
376}
377
378static void
379removeAllPeers( Torrent * t )
380{
381    while( !tr_ptrArrayEmpty( &t->peers ) )
382        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
383}
384
385static void blockIteratorFree( struct tr_blockIterator ** inout );
386
387static void
388torrentDestructor( void * vt )
389{
390    Torrent * t = vt;
391    uint8_t   hash[SHA_DIGEST_LENGTH];
392
393    assert( t );
394    assert( !t->isRunning );
395    assert( torrentIsLocked( t ) );
396    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
397    assert( tr_ptrArrayEmpty( &t->peers ) );
398
399    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
400
401    tr_timerFree( &t->refillTimer );
402
403    blockIteratorFree( &t->refillQueue );
404    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
405    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
406    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
407    tr_ptrArrayDestruct( &t->peers, NULL );
408
409    tr_free( t->pendingRequestCount );
410    tr_free( t );
411}
412
413static void peerCallbackFunc( void * vpeer,
414                              void * vevent,
415                              void * vt );
416
417static Torrent*
418torrentConstructor( tr_peerMgr * manager,
419                    tr_torrent * tor )
420{
421    int       i;
422    Torrent * t;
423
424    t = tr_new0( Torrent, 1 );
425    t->manager = manager;
426    t->tor = tor;
427    t->pool = TR_PTR_ARRAY_INIT;
428    t->peers = TR_PTR_ARRAY_INIT;
429    t->webseeds = TR_PTR_ARRAY_INIT;
430    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
431    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
432
433    for( i = 0; i < tor->info.webseedCount; ++i )
434    {
435        tr_webseed * w =
436            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
437        tr_ptrArrayAppend( &t->webseeds, w );
438    }
439
440    return t;
441}
442
443
444static int bandwidthPulse ( void * vmgr );
445static int rechokePulse   ( void * vmgr );
446static int reconnectPulse ( void * vmgr );
447static int refillUpkeep   ( void * vmgr );
448
449tr_peerMgr*
450tr_peerMgrNew( tr_session * session )
451{
452    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
453    m->session = session;
454    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
455    return m;
456}
457
458static void
459deleteTimers( struct tr_peerMgr * m )
460{
461    if( m->bandwidthTimer )
462        tr_timerFree( &m->bandwidthTimer );
463
464    if( m->rechokeTimer == NULL )
465        tr_timerFree( &m->rechokeTimer );
466
467    if( m->reconnectTimer == NULL )
468        tr_timerFree( &m->reconnectTimer );
469
470    if( m->refillUpkeepTimer == NULL )
471        tr_timerFree( &m->refillUpkeepTimer );
472}
473
474void
475tr_peerMgrFree( tr_peerMgr * manager )
476{
477    managerLock( manager );
478
479    deleteTimers( manager );
480
481    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
482     * the item from manager->handshakes, so this is a little roundabout... */
483    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
484        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
485
486    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
487
488    managerUnlock( manager );
489    tr_free( manager );
490}
491
492static int
493clientIsDownloadingFrom( const tr_peer * peer )
494{
495    return peer->clientIsInterested && !peer->clientIsChoked;
496}
497
498static int
499clientIsUploadingTo( const tr_peer * peer )
500{
501    return peer->peerIsInterested && !peer->peerIsChoked;
502}
503
504/***
505****
506***/
507
508tr_bool
509tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
510                      const tr_address  * addr )
511{
512    tr_bool isSeed = FALSE;
513    const Torrent * t = tor->torrentPeers;
514    const struct peer_atom * atom = getExistingAtom( t, addr );
515
516    if( atom )
517        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
518
519    return isSeed;
520}
521
522/****
523*****
524*****  REFILL
525*****
526****/
527
528static void
529assertValidPiece( Torrent * t, tr_piece_index_t piece )
530{
531    assert( t );
532    assert( t->tor );
533    assert( piece < t->tor->info.pieceCount );
534}
535
536static int
537getPieceRequests( Torrent * t, tr_piece_index_t piece )
538{
539    assertValidPiece( t, piece );
540
541    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
542}
543
544static void
545incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
546{
547    assertValidPiece( t, piece );
548
549    if( t->pendingRequestCount == NULL )
550        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
551    t->pendingRequestCount[piece]++;
552}
553
554static void
555decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
556{
557    assertValidPiece( t, piece );
558
559    if( t->pendingRequestCount )
560        t->pendingRequestCount[piece]--;
561}
562
563struct tr_refill_piece
564{
565    tr_priority_t    priority;
566    uint32_t         piece;
567    uint32_t         peerCount;
568    int              random;
569    int              pendingRequestCount;
570    int              missingBlockCount;
571};
572
573static int
574compareRefillPiece( const void * aIn, const void * bIn )
575{
576    const struct tr_refill_piece * a = aIn;
577    const struct tr_refill_piece * b = bIn;
578
579    /* if one piece has a higher priority, it goes first */
580    if( a->priority != b->priority )
581        return a->priority > b->priority ? -1 : 1;
582
583    /* have a per-priority endgame */
584    if( a->pendingRequestCount != b->pendingRequestCount )
585        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
586
587    /* fewer missing pieces goes first */
588    if( a->missingBlockCount != b->missingBlockCount )
589        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
590
591    /* otherwise if one has fewer peers, it goes first */
592    if( a->peerCount != b->peerCount )
593        return a->peerCount < b->peerCount ? -1 : 1;
594
595    /* otherwise go with our random seed */
596    if( a->random != b->random )
597        return a->random < b->random ? -1 : 1;
598
599    return 0;
600}
601
602static tr_piece_index_t *
603getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
604{
605    const tr_torrent  * tor = t->tor;
606    const tr_info     * inf = &tor->info;
607    tr_piece_index_t    i;
608    tr_piece_index_t    poolSize = 0;
609    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
610    int                 peerCount;
611    const tr_peer    ** peers;
612
613    assert( torrentIsLocked( t ) );
614
615    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
616    peerCount = tr_ptrArraySize( &t->peers );
617
618    /* make a list of the pieces that we want but don't have */
619    for( i = 0; i < inf->pieceCount; ++i )
620        if( !tor->info.pieces[i].dnd
621                && !tr_cpPieceIsComplete( &tor->completion, i ) )
622            pool[poolSize++] = i;
623
624    /* sort the pool by which to request next */
625    if( poolSize > 1 )
626    {
627        tr_piece_index_t j;
628        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
629
630        for( j = 0; j < poolSize; ++j )
631        {
632            int k;
633            const tr_piece_index_t piece = pool[j];
634            struct tr_refill_piece * setme = p + j;
635
636            setme->piece = piece;
637            setme->priority = inf->pieces[piece].priority;
638            setme->peerCount = 0;
639            setme->random = tr_cryptoWeakRandInt( INT_MAX );
640            setme->pendingRequestCount = getPieceRequests( t, piece );
641            setme->missingBlockCount
642                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
643
644            for( k = 0; k < peerCount; ++k )
645            {
646                const tr_peer * peer = peers[k];
647                if( peer->peerIsInterested
648                        && !peer->clientIsChoked
649                        && tr_bitfieldHas( peer->have, piece ) )
650                    ++setme->peerCount;
651            }
652        }
653
654        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
655               compareRefillPiece );
656
657        for( j = 0; j < poolSize; ++j )
658            pool[j] = p[j].piece;
659
660        tr_free( p );
661    }
662
663    *pieceCount = poolSize;
664    return pool;
665}
666
667static struct tr_blockIterator*
668blockIteratorNew( Torrent * t )
669{
670    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
671    i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
672    i->t = t;
673    i->pieces = getPreferredPieces( t, &i->pieceCount );
674    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
675    tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
676    return i;
677}
678
679static tr_bool
680blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
681{
682    tr_bool found;
683    Torrent * t = i->t;
684    tr_torrent * tor = t->tor;
685
686    while( ( i->blockIndex == i->blockCount )
687        && ( i->pieceIndex < i->pieceCount ) )
688    {
689        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
690        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
691        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
692        tr_block_index_t block;
693
694        assert( index < tor->info.pieceCount );
695
696        i->blockCount = 0;
697        i->blockIndex = 0;
698        for( block=b; block!=e; ++block )
699            if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) )
700                i->blocks[i->blockCount++] = block;
701    }
702
703    assert( i->blockCount <= tor->blockCountInPiece );
704
705    if(( found = ( i->blockIndex < i->blockCount )))
706        *setme = i->blocks[i->blockIndex++];
707
708    return found;
709}
710
711static void
712blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
713{
714    i->blockIndex = i->blockCount;
715}
716
717static void
718blockIteratorFree( struct tr_blockIterator ** inout )
719{
720    struct tr_blockIterator * it = *inout;
721
722    if( it != NULL )
723    {
724        tr_free( it->blocks );
725        tr_free( it->pieces );
726        tr_free( it );
727    }
728
729    *inout = NULL;
730}
731
732static tr_peer**
733getPeersUploadingToClient( Torrent * t,
734                           int *     setmeCount )
735{
736    int j;
737    int peerCount = 0;
738    int retCount = 0;
739    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
740    tr_peer ** ret = tr_new( tr_peer *, peerCount );
741
742    j = 0; /* this is a temporary test to make sure we walk through all the peers */
743    if( peerCount )
744    {
745        /* Get a list of peers we're downloading from.
746           Pick a different starting point each time so all peers
747           get a chance at being the first in line */
748        const int fencepost = tr_cryptoWeakRandInt( peerCount );
749        int i = fencepost;
750        do {
751            if( clientIsDownloadingFrom( peers[i] ) )
752                ret[retCount++] = peers[i];
753            i = ( i + 1 ) % peerCount;
754            ++j;
755        } while( i != fencepost );
756    }
757    assert( j == peerCount );
758    *setmeCount = retCount;
759    return ret;
760}
761
762static uint32_t
763getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
764{
765    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
766    const uint64_t blockPos = tor->blockSize * b;
767    assert( blockPos >= piecePos );
768    return (uint32_t)( blockPos - piecePos );
769}
770
771static int
772refillUpkeep( void * vmgr )
773{
774    tr_torrent * tor = NULL;
775    tr_peerMgr * mgr = vmgr;
776    time_t now;
777    managerLock( mgr );
778
779    now = time( NULL );
780    while(( tor = tr_torrentNext( mgr->session, tor ))) {
781        Torrent * t = tor->torrentPeers;
782        if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
783            tordbg( t, "refill queue is past its shelf date; discarding." );
784            blockIteratorFree( &t->refillQueue );
785        }
786    }
787
788    managerUnlock( mgr );
789    return TRUE;
790}
791
792static int
793refillPulse( void * vtorrent )
794{
795    tr_block_index_t block;
796    int peerCount;
797    int webseedCount;
798    tr_peer ** peers;
799    tr_webseed ** webseeds;
800    Torrent * t = vtorrent;
801    tr_torrent * tor = t->tor;
802    tr_bool hasNext = TRUE;
803
804    if( !t->isRunning )
805        return TRUE;
806    if( tr_torrentIsSeed( t->tor ) )
807        return TRUE;
808
809    torrentLock( t );
810    tordbg( t, "Refilling Request Buffers..." );
811
812    if( t->refillQueue == NULL )
813        t->refillQueue = blockIteratorNew( t );
814
815    peers = getPeersUploadingToClient( t, &peerCount );
816    webseedCount = tr_ptrArraySize( &t->webseeds );
817    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
818                          webseedCount * sizeof( tr_webseed* ) );
819
820    while( ( webseedCount || peerCount )
821        && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
822    {
823        int j;
824        tr_bool handled = FALSE;
825
826        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
827        const uint32_t offset = getBlockOffsetInPiece( tor, block );
828        const uint32_t length = tr_torBlockCountBytes( tor, block );
829
830        assert( block < tor->blockCount );
831
832        /* find a peer who can ask for this block */
833        for( j=0; !handled && j<peerCount; )
834        {
835            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
836            switch( val )
837            {
838                case TR_ADDREQ_FULL:
839                case TR_ADDREQ_CLIENT_CHOKED:
840                    peers[j] = peers[--peerCount];
841                    break;
842
843                case TR_ADDREQ_MISSING:
844                case TR_ADDREQ_DUPLICATE:
845                    ++j;
846                    break;
847
848                case TR_ADDREQ_OK:
849                    incrementPieceRequests( t, index );
850                    handled = TRUE;
851                    break;
852
853                default:
854                    assert( 0 && "unhandled value" );
855                    break;
856            }
857        }
858
859        /* maybe one of the webseeds can do it */
860        for( j=0; !handled && j<webseedCount; )
861        {
862            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
863            switch( val )
864            {
865                case TR_ADDREQ_FULL:
866                    webseeds[j] = webseeds[--webseedCount];
867                    break;
868
869                case TR_ADDREQ_OK:
870                    incrementPieceRequests( t, index );
871                    handled = TRUE;
872                    break;
873
874                default:
875                    assert( 0 && "unhandled value" );
876                    break;
877            }
878        }
879
880        if( !handled )
881            blockIteratorSkipCurrentPiece( t->refillQueue );
882    }
883
884    /* cleanup */
885    tr_free( webseeds );
886    tr_free( peers );
887
888    if( !hasNext ) {
889        tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
890        blockIteratorFree( &t->refillQueue );
891    }
892
893    t->refillTimer = NULL;
894    torrentUnlock( t );
895    return FALSE;
896}
897
898static void
899broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
900{
901    size_t i;
902    size_t peerCount;
903    tr_peer ** peers;
904
905    assert( torrentIsLocked( t ) );
906
907    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
908
909    peerCount = tr_ptrArraySize( &t->peers );
910    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
911    for( i=0; i<peerCount; ++i )
912        if( peers[i]->msgs )
913            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
914}
915
916static void
917addStrike( Torrent * t, tr_peer * peer )
918{
919    tordbg( t, "increasing peer %s strike count to %d",
920            tr_peerIoAddrStr( &peer->addr,
921                              peer->port ), peer->strikes + 1 );
922
923    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
924    {
925        struct peer_atom * atom = peer->atom;
926        atom->myflags |= MYFLAG_BANNED;
927        peer->doPurge = 1;
928        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
929    }
930}
931
932static void
933gotBadPiece( Torrent *        t,
934             tr_piece_index_t pieceIndex )
935{
936    tr_torrent *   tor = t->tor;
937    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
938
939    tor->corruptCur += byteCount;
940    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
941}
942
943static void
944refillSoon( Torrent * t )
945{
946    if( t->refillTimer == NULL )
947        t->refillTimer = tr_timerNew( t->manager->session,
948                                      refillPulse, t,
949                                      REFILL_PERIOD_MSEC );
950}
951
952static void
953peerSuggestedPiece( Torrent            * t UNUSED,
954                    tr_peer            * peer UNUSED,
955                    tr_piece_index_t     pieceIndex UNUSED,
956                    int                  isFastAllowed UNUSED )
957{
958#if 0
959    assert( t );
960    assert( peer );
961    assert( peer->msgs );
962
963    /* is this a valid piece? */
964    if(  pieceIndex >= t->tor->info.pieceCount )
965        return;
966
967    /* don't ask for it if we've already got it */
968    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
969        return;
970
971    /* don't ask for it if they don't have it */
972    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
973        return;
974
975    /* don't ask for it if we're choked and it's not fast */
976    if( !isFastAllowed && peer->clientIsChoked )
977        return;
978
979    /* request the blocks that we don't have in this piece */
980    {
981        tr_block_index_t block;
982        const tr_torrent * tor = t->tor;
983        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
984        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
985
986        for( block=start; block<end; ++block )
987        {
988            if( !tr_cpBlockIsComplete( tor->completion, block ) )
989            {
990                const uint32_t offset = getBlockOffsetInPiece( tor, block );
991                const uint32_t length = tr_torBlockCountBytes( tor, block );
992                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
993                incrementPieceRequests( t, pieceIndex );
994            }
995        }
996    }
997#endif
998}
999
1000static void
1001peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1002{
1003    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1004    Torrent * t = vt;
1005    const tr_peer_event * e = vevent;
1006
1007    torrentLock( t );
1008
1009    switch( e->eventType )
1010    {
1011        case TR_PEER_UPLOAD_ONLY:
1012            /* update our atom */
1013            if( peer )
1014                peer->atom->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
1015            break;
1016
1017        case TR_PEER_NEED_REQ:
1018            refillSoon( t );
1019            break;
1020
1021        case TR_PEER_CANCEL:
1022            decrementPieceRequests( t, e->pieceIndex );
1023            break;
1024
1025        case TR_PEER_PEER_GOT_DATA:
1026        {
1027            const time_t now = time( NULL );
1028            tr_torrent * tor = t->tor;
1029
1030            tr_torrentSetActivityDate( tor, now );
1031
1032            if( e->wasPieceData )
1033                tor->uploadedCur += e->length;
1034
1035            /* update the stats */
1036            if( e->wasPieceData )
1037                tr_statsAddUploaded( tor->session, e->length );
1038
1039            /* update our atom */
1040            if( peer && e->wasPieceData )
1041                peer->atom->piece_data_time = now;
1042
1043            tor->needsSeedRatioCheck = TRUE;
1044
1045            break;
1046        }
1047
1048        case TR_PEER_CLIENT_GOT_SUGGEST:
1049            if( peer )
1050                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1051            break;
1052
1053        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1054            if( peer )
1055                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1056            break;
1057
1058        case TR_PEER_CLIENT_GOT_DATA:
1059        {
1060            const time_t now = time( NULL );
1061            tr_torrent * tor = t->tor;
1062
1063            tr_torrentSetActivityDate( tor, now );
1064
1065            /* only add this to downloadedCur if we got it from a peer --
1066             * webseeds shouldn't count against our ratio.  As one tracker
1067             * admin put it, "Those pieces are downloaded directly from the
1068             * content distributor, not the peers, it is the tracker's job
1069             * to manage the swarms, not the web server and does not fit
1070             * into the jurisdiction of the tracker." */
1071            if( peer && e->wasPieceData )
1072                tor->downloadedCur += e->length;
1073
1074            /* update the stats */ 
1075            if( e->wasPieceData )
1076                tr_statsAddDownloaded( tor->session, e->length );
1077
1078            /* update our atom */
1079            if( peer && e->wasPieceData )
1080                peer->atom->piece_data_time = now;
1081
1082            break;
1083        }
1084
1085        case TR_PEER_PEER_PROGRESS:
1086        {
1087            if( peer )
1088            {
1089                struct peer_atom * atom = peer->atom;
1090                const int peerIsSeed = e->progress >= 1.0;
1091                if( peerIsSeed ) {
1092                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1093                    atom->flags |= ADDED_F_SEED_FLAG;
1094                } else {
1095                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1096                    atom->flags &= ~ADDED_F_SEED_FLAG;
1097                }
1098            }
1099            break;
1100        }
1101
1102        case TR_PEER_CLIENT_GOT_BLOCK:
1103        {
1104            tr_torrent * tor = t->tor;
1105
1106            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1107
1108            tr_cpBlockAdd( &tor->completion, block );
1109            decrementPieceRequests( t, e->pieceIndex );
1110
1111            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1112
1113            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1114            {
1115                const tr_piece_index_t p = e->pieceIndex;
1116                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1117
1118                if( !ok )
1119                {
1120                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1121                               (unsigned long)p );
1122                }
1123
1124                tr_torrentSetHasPiece( tor, p, ok );
1125                tr_torrentSetPieceChecked( tor, p, TRUE );
1126                tr_peerMgrSetBlame( tor, p, ok );
1127
1128                if( !ok )
1129                {
1130                    gotBadPiece( t, p );
1131                }
1132                else
1133                {
1134                    int i;
1135                    int peerCount;
1136                    tr_peer ** peers;
1137                    tr_file_index_t fileIndex;
1138
1139                    peerCount = tr_ptrArraySize( &t->peers );
1140                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1141                    for( i=0; i<peerCount; ++i )
1142                        tr_peerMsgsHave( peers[i]->msgs, p );
1143
1144                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1145                    {
1146                        const tr_file * file = &tor->info.files[fileIndex];
1147                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1148                        {
1149                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1150                            tordbg( t, "closing recently-completed file \"%s\"", path );
1151                            tr_fdFileClose( path );
1152                            tr_free( path );
1153                        }
1154                    }
1155                }
1156
1157                tr_torrentRecheckCompleteness( tor );
1158            }
1159            break;
1160        }
1161
1162        case TR_PEER_ERROR:
1163            if( e->err == EINVAL )
1164            {
1165                addStrike( t, peer );
1166                peer->doPurge = 1;
1167                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1168            }
1169            else if( ( e->err == ERANGE )
1170                  || ( e->err == EMSGSIZE )
1171                  || ( e->err == ENOTCONN ) )
1172            {
1173                /* some protocol error from the peer */
1174                peer->doPurge = 1;
1175                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1176            }
1177            else /* a local error, such as an IO error */
1178            {
1179                t->tor->error = e->err;
1180                tr_strlcpy( t->tor->errorString,
1181                            tr_strerror( t->tor->error ),
1182                            sizeof( t->tor->errorString ) );
1183                tr_torrentStop( t->tor );
1184            }
1185            break;
1186
1187        default:
1188            assert( 0 );
1189    }
1190
1191    torrentUnlock( t );
1192}
1193
1194static void
1195ensureAtomExists( Torrent          * t,
1196                  const tr_address * addr,
1197                  tr_port            port,
1198                  uint8_t            flags,
1199                  uint8_t            from )
1200{
1201    if( getExistingAtom( t, addr ) == NULL )
1202    {
1203        struct peer_atom * a;
1204        a = tr_new0( struct peer_atom, 1 );
1205        a->addr = *addr;
1206        a->port = port;
1207        a->flags = flags;
1208        a->from = from;
1209        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1210        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1211    }
1212}
1213
1214static int
1215getMaxPeerCount( const tr_torrent * tor )
1216{
1217    return tor->maxConnectedPeers;
1218}
1219
1220static int
1221getPeerCount( const Torrent * t )
1222{
1223    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1224}
1225
1226/* FIXME: this is kind of a mess. */
1227static tr_bool
1228myHandshakeDoneCB( tr_handshake  * handshake,
1229                   tr_peerIo     * io,
1230                   tr_bool         isConnected,
1231                   const uint8_t * peer_id,
1232                   void          * vmanager )
1233{
1234    tr_bool            ok = isConnected;
1235    tr_bool            success = FALSE;
1236    tr_port            port;
1237    const tr_address * addr;
1238    tr_peerMgr       * manager = vmanager;
1239    Torrent          * t;
1240    tr_handshake     * ours;
1241
1242    assert( io );
1243    assert( tr_isBool( ok ) );
1244
1245    t = tr_peerIoHasTorrentHash( io )
1246        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1247        : NULL;
1248
1249    if( tr_peerIoIsIncoming ( io ) )
1250        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1251                                        handshake, handshakeCompare );
1252    else if( t )
1253        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1254                                        handshake, handshakeCompare );
1255    else
1256        ours = handshake;
1257
1258    assert( ours );
1259    assert( ours == handshake );
1260
1261    if( t )
1262        torrentLock( t );
1263
1264    addr = tr_peerIoGetAddress( io, &port );
1265
1266    if( !ok || !t || !t->isRunning )
1267    {
1268        if( t )
1269        {
1270            struct peer_atom * atom = getExistingAtom( t, addr );
1271            if( atom )
1272                ++atom->numFails;
1273        }
1274    }
1275    else /* looking good */
1276    {
1277        struct peer_atom * atom;
1278        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1279        atom = getExistingAtom( t, addr );
1280        atom->time = time( NULL );
1281        atom->piece_data_time = 0;
1282
1283        if( atom->myflags & MYFLAG_BANNED )
1284        {
1285            tordbg( t, "banned peer %s tried to reconnect",
1286                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1287        }
1288        else if( tr_peerIoIsIncoming( io )
1289               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1290
1291        {
1292        }
1293        else
1294        {
1295            tr_peer * peer = getExistingPeer( t, addr );
1296
1297            if( peer ) /* we already have this peer */
1298            {
1299            }
1300            else
1301            {
1302                peer = getPeer( t, addr );
1303                tr_free( peer->client );
1304
1305                if( !peer_id )
1306                    peer->client = NULL;
1307                else {
1308                    char client[128];
1309                    tr_clientForId( client, sizeof( client ), peer_id );
1310                    peer->client = tr_strdup( client );
1311                }
1312
1313                peer->port = port;
1314                peer->atom = atom;
1315                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1316                                                                balanced by our unref in peerDestructor()  */
1317                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1318                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1319
1320                success = TRUE;
1321            }
1322        }
1323    }
1324
1325    if( t )
1326        torrentUnlock( t );
1327
1328    return success;
1329}
1330
1331void
1332tr_peerMgrAddIncoming( tr_peerMgr * manager,
1333                       tr_address * addr,
1334                       tr_port      port,
1335                       int          socket )
1336{
1337    managerLock( manager );
1338
1339    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1340    {
1341        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1342        tr_netClose( socket );
1343    }
1344    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1345    {
1346        tr_netClose( socket );
1347    }
1348    else /* we don't have a connetion to them yet... */
1349    {
1350        tr_peerIo *    io;
1351        tr_handshake * handshake;
1352
1353        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1354
1355        handshake = tr_handshakeNew( io,
1356                                     manager->session->encryptionMode,
1357                                     myHandshakeDoneCB,
1358                                     manager );
1359
1360        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1361
1362        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1363                                 handshakeCompare );
1364    }
1365
1366    managerUnlock( manager );
1367}
1368
1369static tr_bool
1370tr_isPex( const tr_pex * pex )
1371{
1372    return pex && tr_isAddress( &pex->addr );
1373}
1374
1375void
1376tr_peerMgrAddPex( tr_torrent   *  tor,
1377                  uint8_t         from,
1378                  const tr_pex *  pex )
1379{
1380    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1381    {
1382        Torrent * t = tor->torrentPeers;
1383        managerLock( t->manager );
1384
1385        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1386            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1387                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1388
1389        managerUnlock( t->manager );
1390    }
1391}
1392
1393tr_pex *
1394tr_peerMgrCompactToPex( const void *    compact,
1395                        size_t          compactLen,
1396                        const uint8_t * added_f,
1397                        size_t          added_f_len,
1398                        size_t *        pexCount )
1399{
1400    size_t          i;
1401    size_t          n = compactLen / 6;
1402    const uint8_t * walk = compact;
1403    tr_pex *        pex = tr_new0( tr_pex, n );
1404
1405    for( i = 0; i < n; ++i )
1406    {
1407        pex[i].addr.type = TR_AF_INET;
1408        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1409        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1410        if( added_f && ( n == added_f_len ) )
1411            pex[i].flags = added_f[i];
1412    }
1413
1414    *pexCount = n;
1415    return pex;
1416}
1417
1418tr_pex *
1419tr_peerMgrCompact6ToPex( const void    * compact,
1420                         size_t          compactLen,
1421                         const uint8_t * added_f,
1422                         size_t          added_f_len,
1423                         size_t        * pexCount )
1424{
1425    size_t          i;
1426    size_t          n = compactLen / 18;
1427    const uint8_t * walk = compact;
1428    tr_pex *        pex = tr_new0( tr_pex, n );
1429   
1430    for( i = 0; i < n; ++i )
1431    {
1432        pex[i].addr.type = TR_AF_INET6;
1433        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1434        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1435        if( added_f && ( n == added_f_len ) )
1436            pex[i].flags = added_f[i];
1437    }
1438   
1439    *pexCount = n;
1440    return pex;
1441}
1442
1443tr_pex *
1444tr_peerMgrArrayToPex( const void * array,
1445                      size_t       arrayLen,
1446                      size_t      * pexCount )
1447{
1448    size_t          i;
1449    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1450    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1451    const uint8_t * walk = array;
1452    tr_pex        * pex = tr_new0( tr_pex, n );
1453   
1454    for( i = 0 ; i < n ; i++ ) {
1455        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1456        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1457        pex[i].flags = 0x00;
1458        walk += sizeof( tr_address ) + 2;
1459    }
1460   
1461    *pexCount = n;
1462    return pex;
1463}
1464
1465/**
1466***
1467**/
1468
1469void
1470tr_peerMgrSetBlame( tr_torrent     * tor,
1471                    tr_piece_index_t pieceIndex,
1472                    int              success )
1473{
1474    if( !success )
1475    {
1476        int        peerCount, i;
1477        Torrent *  t = tor->torrentPeers;
1478        tr_peer ** peers;
1479
1480        assert( torrentIsLocked( t ) );
1481
1482        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1483        for( i = 0; i < peerCount; ++i )
1484        {
1485            tr_peer * peer = peers[i];
1486            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1487            {
1488                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1489                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1490                        pieceIndex, (int)peer->strikes + 1 );
1491                addStrike( t, peer );
1492            }
1493        }
1494    }
1495}
1496
1497int
1498tr_pexCompare( const void * va, const void * vb )
1499{
1500    const tr_pex * a = va;
1501    const tr_pex * b = vb;
1502    int i;
1503
1504    assert( tr_isPex( a ) );
1505    assert( tr_isPex( b ) );
1506
1507    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1508        return i;
1509
1510    if( a->port != b->port )
1511        return a->port < b->port ? -1 : 1;
1512
1513    return 0;
1514}
1515
1516static int
1517peerPrefersCrypto( const tr_peer * peer )
1518{
1519    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1520        return TRUE;
1521
1522    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1523        return FALSE;
1524
1525    return tr_peerIoIsEncrypted( peer->io );
1526}
1527
1528int
1529tr_peerMgrGetPeers( tr_torrent      * tor,
1530                    tr_pex         ** setme_pex,
1531                    uint8_t           af)
1532{
1533    int peersReturning = 0;
1534    const Torrent * t = tor->torrentPeers;
1535
1536    managerLock( t->manager );
1537
1538    {
1539        int i;
1540        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1541        const int peerCount = tr_ptrArraySize( &t->peers );
1542        /* for now, this will waste memory on torrents that have both
1543         * ipv6 and ipv4 peers */
1544        tr_pex * pex = tr_new( tr_pex, peerCount );
1545        tr_pex * walk = pex;
1546
1547        for( i=0; i<peerCount; ++i )
1548        {
1549            const tr_peer * peer = peers[i];
1550            if( peer->addr.type == af )
1551            {
1552                const struct peer_atom * atom = peer->atom;
1553
1554                assert( tr_isAddress( &peer->addr ) );
1555                walk->addr = peer->addr;
1556                walk->port = peer->port;
1557                walk->flags = 0;
1558                if( peerPrefersCrypto( peer ) )
1559                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1560                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1561                    walk->flags |= ADDED_F_SEED_FLAG;
1562                ++peersReturning;
1563                ++walk;
1564            }
1565        }
1566
1567        assert( ( walk - pex ) == peersReturning );
1568        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1569        *setme_pex = pex;
1570    }
1571
1572    managerUnlock( t->manager );
1573    return peersReturning;
1574}
1575
1576static void
1577ensureMgrTimersExist( struct tr_peerMgr * m )
1578{
1579    tr_session * s = m->session;
1580
1581    if( m->bandwidthTimer == NULL )
1582        m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
1583
1584    if( m->rechokeTimer == NULL )
1585        m->rechokeTimer = tr_timerNew( s, rechokePulse, m, RECHOKE_PERIOD_MSEC );
1586
1587    if( m->reconnectTimer == NULL )
1588        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
1589
1590    if( m->refillUpkeepTimer == NULL )
1591        m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
1592}
1593
1594void
1595tr_peerMgrStartTorrent( tr_torrent * tor )
1596{
1597    Torrent * t = tor->torrentPeers;
1598
1599    assert( t != NULL );
1600    managerLock( t->manager );
1601    ensureMgrTimersExist( t->manager );
1602
1603    if( !t->isRunning )
1604    {
1605        t->isRunning = TRUE;
1606
1607        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1608            refillSoon( t );
1609    }
1610
1611    rechokePulse( t->manager );
1612    managerUnlock( t->manager );
1613}
1614
1615static void
1616stopTorrent( Torrent * t )
1617{
1618    assert( torrentIsLocked( t ) );
1619
1620    t->isRunning = FALSE;
1621
1622    /* disconnect the peers. */
1623    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1624    tr_ptrArrayClear( &t->peers );
1625
1626    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1627     * which removes the handshake from t->outgoingHandshakes... */
1628    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1629        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1630}
1631
1632void
1633tr_peerMgrStopTorrent( tr_torrent * tor )
1634{
1635    Torrent * t = tor->torrentPeers;
1636
1637    managerLock( t->manager );
1638
1639    stopTorrent( t );
1640
1641    managerUnlock( t->manager );
1642}
1643
1644void
1645tr_peerMgrAddTorrent( tr_peerMgr * manager,
1646                      tr_torrent * tor )
1647{
1648    managerLock( manager );
1649
1650    assert( tor );
1651    assert( tor->torrentPeers == NULL );
1652
1653    tor->torrentPeers = torrentConstructor( manager, tor );
1654
1655    managerUnlock( manager );
1656}
1657
1658void
1659tr_peerMgrRemoveTorrent( tr_torrent * tor )
1660{
1661    tr_torrentLock( tor );
1662
1663    stopTorrent( tor->torrentPeers );
1664    torrentDestructor( tor->torrentPeers );
1665
1666    tr_torrentUnlock( tor );
1667}
1668
1669void
1670tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1671                               int8_t           * tab,
1672                               unsigned int       tabCount )
1673{
1674    tr_piece_index_t   i;
1675    const Torrent *    t;
1676    float              interval;
1677    tr_bool            isSeed;
1678    int                peerCount;
1679    const tr_peer **   peers;
1680    tr_torrentLock( tor );
1681
1682    t = tor->torrentPeers;
1683    tor = t->tor;
1684    interval = tor->info.pieceCount / (float)tabCount;
1685    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1686    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1687    peerCount = tr_ptrArraySize( &t->peers );
1688
1689    memset( tab, 0, tabCount );
1690
1691    for( i = 0; tor && i < tabCount; ++i )
1692    {
1693        const int piece = i * interval;
1694
1695        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1696            tab[i] = -1;
1697        else if( peerCount ) {
1698            int j;
1699            for( j = 0; j < peerCount; ++j )
1700                if( tr_bitfieldHas( peers[j]->have, i ) )
1701                    ++tab[i];
1702        }
1703    }
1704
1705    tr_torrentUnlock( tor );
1706}
1707
1708/* Returns the pieces that are available from peers */
1709tr_bitfield*
1710tr_peerMgrGetAvailable( const tr_torrent * tor )
1711{
1712    int i;
1713    int peerCount;
1714    Torrent * t = tor->torrentPeers;
1715    const tr_peer ** peers;
1716    tr_bitfield * pieces;
1717    managerLock( t->manager );
1718
1719    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1720    peerCount = tr_ptrArraySize( &t->peers );
1721    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1722    for( i=0; i<peerCount; ++i )
1723        tr_bitfieldOr( pieces, peers[i]->have );
1724
1725    managerUnlock( t->manager );
1726    return pieces;
1727}
1728
1729void
1730tr_peerMgrTorrentStats( tr_torrent       * tor,
1731                        int              * setmePeersKnown,
1732                        int              * setmePeersConnected,
1733                        int              * setmeSeedsConnected,
1734                        int              * setmeWebseedsSendingToUs,
1735                        int              * setmePeersSendingToUs,
1736                        int              * setmePeersGettingFromUs,
1737                        int              * setmePeersFrom )
1738{
1739    int i, size;
1740    const Torrent * t = tor->torrentPeers;
1741    const tr_peer ** peers;
1742    const tr_webseed ** webseeds;
1743
1744    managerLock( t->manager );
1745
1746    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1747    size = tr_ptrArraySize( &t->peers );
1748
1749    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1750    *setmePeersConnected       = 0;
1751    *setmeSeedsConnected       = 0;
1752    *setmePeersGettingFromUs   = 0;
1753    *setmePeersSendingToUs     = 0;
1754    *setmeWebseedsSendingToUs  = 0;
1755
1756    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1757        setmePeersFrom[i] = 0;
1758
1759    for( i=0; i<size; ++i )
1760    {
1761        const tr_peer * peer = peers[i];
1762        const struct peer_atom * atom = peer->atom;
1763
1764        if( peer->io == NULL ) /* not connected */
1765            continue;
1766
1767        ++*setmePeersConnected;
1768
1769        ++setmePeersFrom[atom->from];
1770
1771        if( clientIsDownloadingFrom( peer ) )
1772            ++*setmePeersSendingToUs;
1773
1774        if( clientIsUploadingTo( peer ) )
1775            ++*setmePeersGettingFromUs;
1776
1777        if( atom->flags & ADDED_F_SEED_FLAG )
1778            ++*setmeSeedsConnected;
1779    }
1780
1781    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1782    size = tr_ptrArraySize( &t->webseeds );
1783    for( i=0; i<size; ++i )
1784        if( tr_webseedIsActive( webseeds[i] ) )
1785            ++*setmeWebseedsSendingToUs;
1786
1787    managerUnlock( t->manager );
1788}
1789
1790float*
1791tr_peerMgrWebSpeeds( const tr_torrent * tor )
1792{
1793    const Torrent * t = tor->torrentPeers;
1794    const tr_webseed ** webseeds;
1795    int i;
1796    int webseedCount;
1797    float * ret;
1798    uint64_t now;
1799
1800    assert( t->manager );
1801    managerLock( t->manager );
1802
1803    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1804    webseedCount = tr_ptrArraySize( &t->webseeds );
1805    assert( webseedCount == tor->info.webseedCount );
1806    ret = tr_new0( float, webseedCount );
1807    now = tr_date( );
1808
1809    for( i=0; i<webseedCount; ++i )
1810        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1811            ret[i] = -1.0;
1812
1813    managerUnlock( t->manager );
1814    return ret;
1815}
1816
1817double
1818tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1819{
1820    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1821}
1822
1823
1824struct tr_peer_stat *
1825tr_peerMgrPeerStats( const tr_torrent    * tor,
1826                     int                 * setmeCount )
1827{
1828    int i, size;
1829    const Torrent * t = tor->torrentPeers;
1830    const tr_peer ** peers;
1831    tr_peer_stat * ret;
1832    uint64_t now;
1833
1834    assert( t->manager );
1835    managerLock( t->manager );
1836
1837    size = tr_ptrArraySize( &t->peers );
1838    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1839    ret = tr_new0( tr_peer_stat, size );
1840    now = tr_date( );
1841
1842    for( i=0; i<size; ++i )
1843    {
1844        char *                   pch;
1845        const tr_peer *          peer = peers[i];
1846        const struct peer_atom * atom = peer->atom;
1847        tr_peer_stat *           stat = ret + i;
1848
1849        tr_ntop( &peer->addr, stat->addr, sizeof( stat->addr ) );
1850        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1851                   sizeof( stat->client ) );
1852        stat->port               = ntohs( peer->port );
1853        stat->from               = atom->from;
1854        stat->progress           = peer->progress;
1855        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1856        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1857        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1858        stat->peerIsChoked       = peer->peerIsChoked;
1859        stat->peerIsInterested   = peer->peerIsInterested;
1860        stat->clientIsChoked     = peer->clientIsChoked;
1861        stat->clientIsInterested = peer->clientIsInterested;
1862        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1863        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1864        stat->isUploadingTo      = clientIsUploadingTo( peer );
1865        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1866
1867        pch = stat->flagStr;
1868        if( t->optimistic == peer ) *pch++ = 'O';
1869        if( stat->isDownloadingFrom ) *pch++ = 'D';
1870        else if( stat->clientIsInterested ) *pch++ = 'd';
1871        if( stat->isUploadingTo ) *pch++ = 'U';
1872        else if( stat->peerIsInterested ) *pch++ = 'u';
1873        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1874        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1875        if( stat->isEncrypted ) *pch++ = 'E';
1876        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1877        if( stat->isIncoming ) *pch++ = 'I';
1878        *pch = '\0';
1879    }
1880
1881    *setmeCount = size;
1882
1883    managerUnlock( t->manager );
1884    return ret;
1885}
1886
1887/**
1888***
1889**/
1890
1891struct ChokeData
1892{
1893    tr_bool         doUnchoke;
1894    tr_bool         isInterested;
1895    tr_bool         isChoked;
1896    int             rate;
1897    tr_peer *       peer;
1898};
1899
1900static int
1901compareChoke( const void * va,
1902              const void * vb )
1903{
1904    const struct ChokeData * a = va;
1905    const struct ChokeData * b = vb;
1906
1907    if( a->rate != b->rate ) /* prefer higher overall speeds */
1908        return a->rate > b->rate ? -1 : 1;
1909
1910    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1911        return a->isChoked ? 1 : -1;
1912
1913    return 0;
1914}
1915
1916static int
1917isNew( const tr_peer * peer )
1918{
1919    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1920}
1921
1922static int
1923isSame( const tr_peer * peer )
1924{
1925    return peer && peer->client && strstr( peer->client, "Transmission" );
1926}
1927
1928/**
1929***
1930**/
1931
1932static void
1933rechokeTorrent( Torrent * t )
1934{
1935    int i, size, unchokedInterested;
1936    const int peerCount = tr_ptrArraySize( &t->peers );
1937    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1938    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1939    const tr_session * session = t->manager->session;
1940    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1941    const uint64_t now = tr_date( );
1942
1943    assert( torrentIsLocked( t ) );
1944
1945    /* sort the peers by preference and rate */
1946    for( i = 0, size = 0; i < peerCount; ++i )
1947    {
1948        tr_peer * peer = peers[i];
1949        struct peer_atom * atom = peer->atom;
1950
1951        if( peer->progress >= 1.0 ) /* choke all seeds */
1952        {
1953            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1954        }
1955        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1956        {
1957            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1958        }
1959        else if( chokeAll ) /* choke everyone if we're not uploading */
1960        {
1961            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1962        }
1963        else
1964        {
1965            struct ChokeData * n = &choke[size++];
1966            n->peer         = peer;
1967            n->isInterested = peer->peerIsInterested;
1968            n->isChoked     = peer->peerIsChoked;
1969            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1970        }
1971    }
1972
1973    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1974
1975    /**
1976     * Reciprocation and number of uploads capping is managed by unchoking
1977     * the N peers which have the best upload rate and are interested.
1978     * This maximizes the client's download rate. These N peers are
1979     * referred to as downloaders, because they are interested in downloading
1980     * from the client.
1981     *
1982     * Peers which have a better upload rate (as compared to the downloaders)
1983     * but aren't interested get unchoked. If they become interested, the
1984     * downloader with the worst upload rate gets choked. If a client has
1985     * a complete file, it uses its upload rate rather than its download
1986     * rate to decide which peers to unchoke.
1987     */
1988    unchokedInterested = 0;
1989    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
1990        choke[i].doUnchoke = 1;
1991        if( choke[i].isInterested )
1992            ++unchokedInterested;
1993    }
1994
1995    /* optimistic unchoke */
1996    if( i < size )
1997    {
1998        int n;
1999        struct ChokeData * c;
2000        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2001
2002        for( ; i<size; ++i )
2003        {
2004            if( choke[i].isInterested )
2005            {
2006                const tr_peer * peer = choke[i].peer;
2007                int x = 1, y;
2008                if( isNew( peer ) ) x *= 3;
2009                if( isSame( peer ) ) x *= 3;
2010                for( y=0; y<x; ++y )
2011                    tr_ptrArrayAppend( &randPool, &choke[i] );
2012            }
2013        }
2014
2015        if(( n = tr_ptrArraySize( &randPool )))
2016        {
2017            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2018            c->doUnchoke = 1;
2019            t->optimistic = c->peer;
2020        }
2021
2022        tr_ptrArrayDestruct( &randPool, NULL );
2023    }
2024
2025    for( i=0; i<size; ++i )
2026        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2027
2028    /* cleanup */
2029    tr_free( choke );
2030}
2031
2032static int
2033rechokePulse( void * vmgr )
2034{
2035    tr_torrent * tor = NULL;
2036    tr_peerMgr * mgr = vmgr;
2037    managerLock( mgr );
2038
2039    while(( tor = tr_torrentNext( mgr->session, tor )))
2040        if( tor->isRunning )
2041            rechokeTorrent( tor->torrentPeers );
2042
2043    managerUnlock( mgr );
2044    return TRUE;
2045}
2046
2047/***
2048****
2049****  Life and Death
2050****
2051***/
2052
2053typedef enum
2054{
2055    TR_CAN_KEEP,
2056    TR_CAN_CLOSE,
2057    TR_MUST_CLOSE,
2058}
2059tr_close_type_t;
2060
2061static tr_close_type_t
2062shouldPeerBeClosed( const Torrent    * t,
2063                    const tr_peer    * peer,
2064                    int                peerCount )
2065{
2066    const tr_torrent *       tor = t->tor;
2067    const time_t             now = time( NULL );
2068    const struct peer_atom * atom = peer->atom;
2069
2070    /* if it's marked for purging, close it */
2071    if( peer->doPurge )
2072    {
2073        tordbg( t, "purging peer %s because its doPurge flag is set",
2074                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2075        return TR_MUST_CLOSE;
2076    }
2077
2078    /* if we're seeding and the peer has everything we have,
2079     * and enough time has passed for a pex exchange, then disconnect */
2080    if( tr_torrentIsSeed( tor ) )
2081    {
2082        int peerHasEverything;
2083        if( atom->flags & ADDED_F_SEED_FLAG )
2084            peerHasEverything = TRUE;
2085        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2086            peerHasEverything = FALSE;
2087        else {
2088            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2089            tr_bitfieldDifference( tmp, peer->have );
2090            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2091            tr_bitfieldFree( tmp );
2092        }
2093
2094        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2095        {
2096            tordbg( t, "purging peer %s because we're both seeds",
2097                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2098            return TR_MUST_CLOSE;
2099        }
2100    }
2101
2102    /* disconnect if it's been too long since piece data has been transferred.
2103     * this is on a sliding scale based on number of available peers... */
2104    {
2105        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2106        /* if we have >= relaxIfFewerThan, strictness is 100%.
2107         * if we have zero connections, strictness is 0% */
2108        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2109                               ? 1.0
2110                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2111        const int lo = MIN_UPLOAD_IDLE_SECS;
2112        const int hi = MAX_UPLOAD_IDLE_SECS;
2113        const int limit = hi - ( ( hi - lo ) * strictness );
2114        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2115/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2116        if( idleTime > limit ) {
2117            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2118                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2119            return TR_CAN_CLOSE;
2120        }
2121    }
2122
2123    return TR_CAN_KEEP;
2124}
2125
2126static tr_peer **
2127getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2128{
2129    int i, peerCount, outsize;
2130    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2131    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2132
2133    assert( torrentIsLocked( t ) );
2134
2135    for( i = outsize = 0; i < peerCount; ++i )
2136        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2137            ret[outsize++] = peers[i];
2138
2139    *setmeSize = outsize;
2140    return ret;
2141}
2142
2143static int
2144compareCandidates( const void * va,
2145                   const void * vb )
2146{
2147    const struct peer_atom * a = *(const struct peer_atom**) va;
2148    const struct peer_atom * b = *(const struct peer_atom**) vb;
2149
2150    /* <Charles> Here we would probably want to try reconnecting to
2151     * peers that had most recently given us data. Lots of users have
2152     * trouble with resets due to their routers and/or ISPs. This way we
2153     * can quickly recover from an unwanted reset. So we sort
2154     * piece_data_time in descending order.
2155     */
2156
2157    if( a->piece_data_time != b->piece_data_time )
2158        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2159
2160    if( a->numFails != b->numFails )
2161        return a->numFails < b->numFails ? -1 : 1;
2162
2163    if( a->time != b->time )
2164        return a->time < b->time ? -1 : 1;
2165
2166    /* all other things being equal, prefer peers whose
2167     * information comes from a more reliable source */
2168    if( a->from != b->from )
2169        return a->from < b->from ? -1 : 1;
2170
2171    return 0;
2172}
2173
2174static int
2175getReconnectIntervalSecs( const struct peer_atom * atom )
2176{
2177    int          sec;
2178    const time_t now = time( NULL );
2179
2180    /* if we were recently connected to this peer and transferring piece
2181     * data, try to reconnect to them sooner rather that later -- we don't
2182     * want network troubles to get in the way of a good peer. */
2183    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2184        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2185
2186    /* don't allow reconnects more often than our minimum */
2187    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2188        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2189
2190    /* otherwise, the interval depends on how many times we've tried
2191     * and failed to connect to the peer */
2192    else switch( atom->numFails ) {
2193        case 0: sec = 0; break;
2194        case 1: sec = 5; break;
2195        case 2: sec = 2 * 60; break;
2196        case 3: sec = 15 * 60; break;
2197        case 4: sec = 30 * 60; break;
2198        case 5: sec = 60 * 60; break;
2199        default: sec = 120 * 60; break;
2200    }
2201
2202    return sec;
2203}
2204
2205static struct peer_atom **
2206getPeerCandidates( Torrent * t, int * setmeSize )
2207{
2208    int                 i, atomCount, retCount;
2209    struct peer_atom ** atoms;
2210    struct peer_atom ** ret;
2211    const time_t        now = time( NULL );
2212    const int           seed = tr_torrentIsSeed( t->tor );
2213
2214    assert( torrentIsLocked( t ) );
2215
2216    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2217    ret = tr_new( struct peer_atom*, atomCount );
2218    for( i = retCount = 0; i < atomCount; ++i )
2219    {
2220        int                interval;
2221        struct peer_atom * atom = atoms[i];
2222
2223        /* peer fed us too much bad data ... we only keep it around
2224         * now to weed it out in case someone sends it to us via pex */
2225        if( atom->myflags & MYFLAG_BANNED )
2226            continue;
2227
2228        /* peer was unconnectable before, so we're not going to keep trying.
2229         * this is needs a separate flag from `banned', since if they try
2230         * to connect to us later, we'll let them in */
2231        if( atom->myflags & MYFLAG_UNREACHABLE )
2232            continue;
2233
2234        /* we don't need two connections to the same peer... */
2235        if( peerIsInUse( t, &atom->addr ) )
2236            continue;
2237
2238        /* no need to connect if we're both seeds... */
2239        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2240                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2241            continue;
2242
2243        /* don't reconnect too often */
2244        interval = getReconnectIntervalSecs( atom );
2245        if( ( now - atom->time ) < interval )
2246        {
2247            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2248                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2249            continue;
2250        }
2251
2252        /* Don't connect to peers in our blocklist */
2253        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2254            continue;
2255
2256        ret[retCount++] = atom;
2257    }
2258
2259    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2260    *setmeSize = retCount;
2261    return ret;
2262}
2263
2264static void
2265closePeer( Torrent * t, tr_peer * peer )
2266{
2267    struct peer_atom * atom;
2268
2269    assert( t != NULL );
2270    assert( peer != NULL );
2271
2272    atom = peer->atom;
2273
2274    /* if we transferred piece data, then they might be good peers,
2275       so reset their `numFails' weight to zero.  otherwise we connected
2276       to them fruitlessly, so mark it as another fail */
2277    if( atom->piece_data_time )
2278        atom->numFails = 0;
2279    else
2280        ++atom->numFails;
2281
2282    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2283    removePeer( t, peer );
2284}
2285
2286static void
2287reconnectTorrent( Torrent * t )
2288{
2289    static time_t prevTime = 0;
2290    static int    newConnectionsThisSecond = 0;
2291    time_t        now;
2292
2293    now = time( NULL );
2294    if( prevTime != now )
2295    {
2296        prevTime = now;
2297        newConnectionsThisSecond = 0;
2298    }
2299
2300    if( !t->isRunning )
2301    {
2302        removeAllPeers( t );
2303    }
2304    else
2305    {
2306        int i;
2307        int canCloseCount;
2308        int mustCloseCount;
2309        int candidateCount;
2310        int maxCandidates;
2311        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2312        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2313        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2314
2315        tordbg( t, "reconnect pulse for [%s]: "
2316                   "%d must-close connections, "
2317                   "%d can-close connections, "
2318                   "%d connection candidates, "
2319                   "%d atoms, "
2320                   "max per pulse is %d",
2321                   t->tor->info.name,
2322                   mustCloseCount,
2323                   canCloseCount,
2324                   candidateCount,
2325                   tr_ptrArraySize( &t->pool ),
2326                   MAX_RECONNECTIONS_PER_PULSE );
2327
2328        /* disconnect the really bad peers */
2329        for( i=0; i<mustCloseCount; ++i )
2330            closePeer( t, mustClose[i] );
2331
2332        /* decide how many peers can we try to add in this pass */
2333        maxCandidates = candidateCount;
2334        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2335        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2336        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2337
2338        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2339        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2340            closePeer( t, canClose[i] );
2341
2342        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2343                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2344                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2345                   candidateCount,
2346                   MAX_RECONNECTIONS_PER_PULSE,
2347                   getPeerCount( t ),
2348                   getMaxPeerCount( t->tor ),
2349                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2350
2351        /* add some new ones */
2352        for( i=0; i<maxCandidates; ++i )
2353        {
2354            tr_peerMgr        * mgr = t->manager;
2355            struct peer_atom  * atom = candidates[i];
2356            tr_peerIo         * io;
2357
2358            tordbg( t, "Starting an OUTGOING connection with %s",
2359                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2360
2361            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2362
2363            if( io == NULL )
2364            {
2365                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2366                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2367                atom->myflags |= MYFLAG_UNREACHABLE;
2368            }
2369            else
2370            {
2371                tr_handshake * handshake = tr_handshakeNew( io,
2372                                                            mgr->session->encryptionMode,
2373                                                            myHandshakeDoneCB,
2374                                                            mgr );
2375
2376                assert( tr_peerIoGetTorrentHash( io ) );
2377
2378                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2379
2380                ++newConnectionsThisSecond;
2381
2382                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2383                                         handshakeCompare );
2384            }
2385
2386            atom->time = time( NULL );
2387        }
2388
2389        /* cleanup */
2390        tr_free( candidates );
2391        tr_free( mustClose );
2392        tr_free( canClose );
2393    }
2394}
2395
2396static int
2397reconnectPulse( void * vmgr )
2398{
2399    tr_torrent * tor = NULL;
2400    tr_peerMgr * mgr = vmgr;
2401    managerLock( mgr );
2402
2403    while(( tor = tr_torrentNext( mgr->session, tor )))
2404        if( tor->isRunning )
2405            reconnectTorrent( tor->torrentPeers );
2406
2407    managerUnlock( mgr );
2408    return TRUE;
2409}
2410
2411/****
2412*****
2413*****  BANDWIDTH ALLOCATION
2414*****
2415****/
2416
2417static void
2418pumpAllPeers( tr_peerMgr * mgr )
2419{
2420    tr_torrent * tor = NULL;
2421
2422    while(( tor = tr_torrentNext( mgr->session, tor )))
2423    {
2424        int j;
2425        Torrent * t = tor->torrentPeers;
2426
2427        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2428        {
2429            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2430            tr_peerMsgsPulse( peer->msgs );
2431        }
2432    }
2433}
2434
2435static int
2436bandwidthPulse( void * vmgr )
2437{
2438    tr_torrent * tor = NULL;
2439    tr_peerMgr * mgr = vmgr;
2440    managerLock( mgr );
2441
2442    /* FIXME: this next line probably isn't necessary... */
2443    pumpAllPeers( mgr );
2444
2445    /* allocate bandwidth to the peers */
2446    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2447    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2448
2449    /* possibly stop torrents that have seeded enough */
2450    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2451        if( tor->needsSeedRatioCheck ) {
2452            tor->needsSeedRatioCheck = FALSE;
2453            tr_torrentCheckSeedRatio( tor );
2454        }
2455    }
2456
2457    managerUnlock( mgr );
2458    return TRUE;
2459}
Note: See TracBrowser for help on using the repository browser.